/*
 * Decompiled with CFR 0.152.
 */
package test.org.apache.spark.streaming;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.StreamingContextSuite;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.Utils;
import org.junit.Assert;
import org.junit.Test;
import org.sparkproject.guava.collect.Sets;
import org.sparkproject.guava.io.Files;
import scala.Tuple2;

public class JavaAPISuite
extends LocalJavaStreamingContext
implements Serializable {
    private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers"), new Tuple2((Object)"california", (Object)"giants"), new Tuple2((Object)"new york", (Object)"yankees"), new Tuple2((Object)"new york", (Object)"mets")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks"), new Tuple2((Object)"california", (Object)"ducks"), new Tuple2((Object)"new york", (Object)"rangers"), new Tuple2((Object)"new york", (Object)"islanders")));
    private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)1), new Tuple2((Object)"california", (Object)3), new Tuple2((Object)"new york", (Object)4), new Tuple2((Object)"new york", (Object)1)), Arrays.asList(new Tuple2((Object)"california", (Object)5), new Tuple2((Object)"california", (Object)5), new Tuple2((Object)"new york", (Object)3), new Tuple2((Object)"new york", (Object)1)));

    public static void equalIterator(Iterator<?> a, Iterator<?> b) {
        while (a.hasNext() && b.hasNext()) {
            Assert.assertEquals(a.next(), b.next());
        }
        Assert.assertEquals((Object)a.hasNext(), (Object)b.hasNext());
    }

    public static void equalIterable(Iterable<?> a, Iterable<?> b) {
        JavaAPISuite.equalIterator(a.iterator(), b.iterator());
    }

    @Test
    public void testInitialization() {
        Assert.assertNotNull((Object)this.ssc.sparkContext());
    }

    @Test
    public void testContextState() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
        Assert.assertEquals((Object)StreamingContextState.INITIALIZED, (Object)this.ssc.getState());
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaTestUtils.attachTestOutputStream(stream);
        Assert.assertEquals((Object)StreamingContextState.INITIALIZED, (Object)this.ssc.getState());
        this.ssc.start();
        Assert.assertEquals((Object)StreamingContextState.ACTIVE, (Object)this.ssc.getState());
        this.ssc.stop();
        Assert.assertEquals((Object)StreamingContextState.STOPPED, (Object)this.ssc.getState());
    }

    @Test
    public void testCount() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4, 5), Arrays.asList(3));
        List expected = Arrays.asList(Arrays.asList(4L), Arrays.asList(3L), Arrays.asList(1L));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream count = stream.count();
        JavaTestUtils.attachTestOutputStream(count);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testMap() {
        List inputData = Arrays.asList(Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon"));
        List expected = Arrays.asList(Arrays.asList(5, 5), Arrays.asList(9, 4));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream letterCount = stream.map(String::length);
        JavaTestUtils.attachTestOutputStream(letterCount);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testWindow() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9));
        List expected = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6, 1, 2, 3), Arrays.asList(7, 8, 9, 4, 5, 6), Arrays.asList(7, 8, 9));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream windowed = stream.window(new Duration(2000L));
        JavaTestUtils.attachTestOutputStream(windowed);
        List result = JavaTestUtils.runStreams(this.ssc, 4, 4);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testWindowWithSlideDuration() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9), Arrays.asList(10, 11, 12), Arrays.asList(13, 14, 15), Arrays.asList(16, 17, 18));
        List expected = Arrays.asList(Arrays.asList(1, 2, 3, 4, 5, 6), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), Arrays.asList(7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18), Arrays.asList(13, 14, 15, 16, 17, 18));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream windowed = stream.window(new Duration(4000L), new Duration(2000L));
        JavaTestUtils.attachTestOutputStream(windowed);
        List result = JavaTestUtils.runStreams(this.ssc, 8, 4);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testFilter() {
        List inputData = Arrays.asList(Arrays.asList("giants", "dodgers"), Arrays.asList("yankees", "red sox"));
        List expected = Arrays.asList(Arrays.asList("giants"), Arrays.asList("yankees"));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream filtered = stream.filter((Function & Serializable)s -> s.contains("a"));
        JavaTestUtils.attachTestOutputStream(filtered);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testRepartitionMorePartitions() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 2);
        JavaDStream repartitioned = stream.repartition(4);
        JavaTestUtils.attachTestOutputStream(repartitioned);
        List result = JavaTestUtils.runStreamsWithPartitions(this.ssc, 2, 2);
        Assert.assertEquals((long)2L, (long)result.size());
        for (List rdd : result) {
            Assert.assertEquals((long)4L, (long)rdd.size());
            Assert.assertEquals((long)10L, (long)(rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()));
        }
    }

    @Test
    public void testRepartitionFewerPartitions() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 4);
        JavaDStream repartitioned = stream.repartition(2);
        JavaTestUtils.attachTestOutputStream(repartitioned);
        List result = JavaTestUtils.runStreamsWithPartitions(this.ssc, 2, 2);
        Assert.assertEquals((long)2L, (long)result.size());
        for (List rdd : result) {
            Assert.assertEquals((long)2L, (long)rdd.size());
            Assert.assertEquals((long)10L, (long)(rdd.get(0).size() + rdd.get(1).size()));
        }
    }

    @Test
    public void testGlom() {
        List inputData = Arrays.asList(Arrays.asList("giants", "dodgers"), Arrays.asList("yankees", "red sox"));
        List<List> expected = Arrays.asList(Arrays.asList(Arrays.asList("giants", "dodgers")), Arrays.asList(Arrays.asList("yankees", "red sox")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream glommed = stream.glom();
        JavaTestUtils.attachTestOutputStream(glommed);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testMapPartitions() {
        List inputData = Arrays.asList(Arrays.asList("giants", "dodgers"), Arrays.asList("yankees", "red sox"));
        List<List> expected = Arrays.asList(Arrays.asList("GIANTSDODGERS"), Arrays.asList("YANKEESRED SOX"));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream mapped = stream.mapPartitions((FlatMapFunction & Serializable)in -> {
            StringBuilder out = new StringBuilder();
            while (in.hasNext()) {
                out.append(((String)in.next()).toUpperCase(Locale.ROOT));
            }
            return Arrays.asList(out.toString()).iterator();
        });
        JavaTestUtils.attachTestOutputStream(mapped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testReduce() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9));
        List<List> expected = Arrays.asList(Arrays.asList(6), Arrays.asList(15), Arrays.asList(24));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream reduced = stream.reduce((Function2)new IntegerSum());
        JavaTestUtils.attachTestOutputStream(reduced);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testReduceByWindowWithInverse() {
        this.testReduceByWindow(true);
    }

    @Test
    public void testReduceByWindowWithoutInverse() {
        this.testReduceByWindow(false);
    }

    private void testReduceByWindow(boolean withInverse) {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9));
        List<List> expected = Arrays.asList(Arrays.asList(6), Arrays.asList(21), Arrays.asList(39), Arrays.asList(24));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream reducedWindowed = withInverse ? stream.reduceByWindow((Function2)new IntegerSum(), (Function2)new IntegerDifference(), new Duration(2000L), new Duration(1000L)) : stream.reduceByWindow((Function2)new IntegerSum(), new Duration(2000L), new Duration(1000L));
        JavaTestUtils.attachTestOutputStream(reducedWindowed);
        List result = JavaTestUtils.runStreams(this.ssc, 4, 4);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testQueueStream() {
        this.ssc.stop();
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("test").set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
        this.ssc = new JavaStreamingContext(conf, new Duration(1000L));
        List<List> expected = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9));
        JavaSparkContext jsc = new JavaSparkContext(this.ssc.ssc().sc());
        JavaRDD rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
        JavaRDD rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
        JavaRDD rdd3 = jsc.parallelize(Arrays.asList(7, 8, 9));
        LinkedList<JavaRDD> rdds = new LinkedList<JavaRDD>();
        rdds.add(rdd1);
        rdds.add(rdd2);
        rdds.add(rdd3);
        JavaDStream stream = this.ssc.queueStream(rdds);
        JavaTestUtils.attachTestOutputStream(stream);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testTransform() {
        List inputData = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9));
        List expected = Arrays.asList(Arrays.asList(3, 4, 5), Arrays.asList(6, 7, 8), Arrays.asList(9, 10, 11));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream transformed = stream.transform((Function & Serializable)in -> in.map((Function & Serializable)i -> i + 2));
        JavaTestUtils.attachTestOutputStream(transformed);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testVariousTransform() {
        List inputData = Arrays.asList(Arrays.asList(1));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        List pairInputData = Arrays.asList(Arrays.asList(new Tuple2((Object)"x", (Object)1)));
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(JavaTestUtils.attachTestInputStream(this.ssc, pairInputData, 1));
        stream.transform((Function & Serializable)in -> null);
        stream.transform((Function2 & Serializable)(in, time) -> null);
        stream.transformToPair((Function & Serializable)in -> null);
        stream.transformToPair((Function2 & Serializable)(in, time) -> null);
        pairStream.transform((Function & Serializable)in -> null);
        pairStream.transform((Function2 & Serializable)(in, time) -> null);
        pairStream.transformToPair((Function & Serializable)in -> null);
        pairStream.transformToPair((Function2 & Serializable)(in, time) -> null);
    }

    @Test
    public void testTransformWith() {
        List stringStringKVStream1 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers"), new Tuple2((Object)"new york", (Object)"yankees")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks"), new Tuple2((Object)"new york", (Object)"rangers")));
        List stringStringKVStream2 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"giants"), new Tuple2((Object)"new york", (Object)"mets")), Arrays.asList(new Tuple2((Object)"california", (Object)"ducks"), new Tuple2((Object)"new york", (Object)"islanders")));
        List<HashSet> expected = Arrays.asList(Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"california", (Object)new Tuple2((Object)"dodgers", (Object)"giants")), new Tuple2((Object)"new york", (Object)new Tuple2((Object)"yankees", (Object)"mets"))}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"california", (Object)new Tuple2((Object)"sharks", (Object)"ducks")), new Tuple2((Object)"new york", (Object)new Tuple2((Object)"rangers", (Object)"islanders"))}));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream1, 1);
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream2, 1);
        JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
        JavaPairDStream joined = pairStream1.transformWithToPair(pairStream2, (Function3 & Serializable)(rdd1, rdd2, time) -> rdd1.join(rdd2));
        JavaTestUtils.attachTestOutputStream(joined);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        ArrayList<HashSet> unorderedResult = new ArrayList<HashSet>();
        for (List res : result) {
            unorderedResult.add(Sets.newHashSet(res));
        }
        Assert.assertEquals(expected, unorderedResult);
    }

    @Test
    public void testVariousTransformWith() {
        List inputData1 = Arrays.asList(Arrays.asList(1));
        List inputData2 = Arrays.asList(Arrays.asList("x"));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, inputData1, 1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, inputData2, 1);
        List pairInputData1 = Arrays.asList(Arrays.asList(new Tuple2((Object)"x", (Object)1)));
        List pairInputData2 = Arrays.asList(Arrays.asList(new Tuple2((Object)1.0, (Object)Character.valueOf('x'))));
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(JavaTestUtils.attachTestInputStream(this.ssc, pairInputData1, 1));
        JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(JavaTestUtils.attachTestInputStream(this.ssc, pairInputData2, 1));
        stream1.transformWith(stream2, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        stream1.transformWith(pairStream1, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        stream1.transformWithToPair(stream2, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        stream1.transformWithToPair(pairStream1, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        pairStream1.transformWith(stream2, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        pairStream1.transformWith(pairStream1, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        pairStream1.transformWithToPair(stream2, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
        pairStream1.transformWithToPair(pairStream2, (Function3 & Serializable)(rdd1, rdd2, time) -> null);
    }

    @Test
    public void testStreamingContextTransform() {
        List stream1input = Arrays.asList(Arrays.asList(1), Arrays.asList(2));
        List stream2input = Arrays.asList(Arrays.asList(3), Arrays.asList(4));
        List pairStream1input = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)"x")), Arrays.asList(new Tuple2((Object)2, (Object)"y")));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)new Tuple2((Object)1, (Object)"x"))), Arrays.asList(new Tuple2((Object)2, (Object)new Tuple2((Object)2, (Object)"y"))));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, stream1input, 1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, stream2input, 1);
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(JavaTestUtils.attachTestInputStream(this.ssc, pairStream1input, 1));
        List<JavaDStream> listOfDStreams1 = Arrays.asList(stream1, stream2);
        this.ssc.transform(listOfDStreams1, (Function2 & Serializable)(listOfRDDs, time) -> {
            Assert.assertEquals((long)2L, (long)listOfRDDs.size());
            return null;
        });
        List<JavaDStream> listOfDStreams2 = Arrays.asList(stream1, stream2, pairStream1.toJavaDStream());
        JavaPairDStream transformed2 = this.ssc.transformToPair(listOfDStreams2, (Function2 & Serializable)(listOfRDDs, time) -> {
            Assert.assertEquals((long)3L, (long)listOfRDDs.size());
            JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0);
            JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1);
            JavaRDD rdd3 = (JavaRDD)listOfRDDs.get(2);
            JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD((JavaRDD)rdd3);
            PairFunction & Serializable mapToTuple = (PairFunction & Serializable)i -> new Tuple2(i, i);
            return rdd1.union(rdd2).mapToPair((PairFunction)mapToTuple).join(prdd3);
        });
        JavaTestUtils.attachTestOutputStream(transformed2);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testFlatMap() {
        List inputData = Arrays.asList(Arrays.asList("go", "giants"), Arrays.asList("boo", "dodgers"), Arrays.asList("athletics"));
        List expected = Arrays.asList(Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream flatMapped = stream.flatMap((FlatMapFunction & Serializable)x -> Arrays.asList(x.split("(?!^)")).iterator());
        JavaTestUtils.attachTestOutputStream(flatMapped);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testForeachRDD() {
        LongAccumulator accumRdd = this.ssc.sparkContext().sc().longAccumulator();
        LongAccumulator accumEle = this.ssc.sparkContext().sc().longAccumulator();
        List inputData = Arrays.asList(Arrays.asList(1, 1, 1), Arrays.asList(1, 1, 1));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaTestUtils.attachTestOutputStream(stream.count());
        stream.foreachRDD((VoidFunction & Serializable)rdd -> {
            accumRdd.add(1L);
            rdd.foreach((VoidFunction & Serializable)i -> accumEle.add(1L));
        });
        stream.foreachRDD((VoidFunction2 & Serializable)(rdd, time) -> {});
        JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals((long)2L, (long)accumRdd.value().intValue());
        Assert.assertEquals((long)6L, (long)accumEle.value().intValue());
    }

    @Test
    public void testPairFlatMap() {
        List inputData = Arrays.asList(Arrays.asList("giants"), Arrays.asList("dodgers"), Arrays.asList("athletics"));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)6, (Object)"g"), new Tuple2((Object)6, (Object)"i"), new Tuple2((Object)6, (Object)"a"), new Tuple2((Object)6, (Object)"n"), new Tuple2((Object)6, (Object)"t"), new Tuple2((Object)6, (Object)"s")), Arrays.asList(new Tuple2((Object)7, (Object)"d"), new Tuple2((Object)7, (Object)"o"), new Tuple2((Object)7, (Object)"d"), new Tuple2((Object)7, (Object)"g"), new Tuple2((Object)7, (Object)"e"), new Tuple2((Object)7, (Object)"r"), new Tuple2((Object)7, (Object)"s")), Arrays.asList(new Tuple2((Object)9, (Object)"a"), new Tuple2((Object)9, (Object)"t"), new Tuple2((Object)9, (Object)"h"), new Tuple2((Object)9, (Object)"l"), new Tuple2((Object)9, (Object)"e"), new Tuple2((Object)9, (Object)"t"), new Tuple2((Object)9, (Object)"i"), new Tuple2((Object)9, (Object)"c"), new Tuple2((Object)9, (Object)"s")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream flatMapped = stream.flatMapToPair((PairFlatMapFunction & Serializable)in -> {
            ArrayList<Tuple2> out = new ArrayList<Tuple2>();
            for (String letter : in.split("(?!^)")) {
                out.add(new Tuple2((Object)in.length(), (Object)letter));
            }
            return out.iterator();
        });
        JavaTestUtils.attachTestOutputStream(flatMapped);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testUnion() {
        List inputData1 = Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2), Arrays.asList(3, 3));
        List inputData2 = Arrays.asList(Arrays.asList(4, 4), Arrays.asList(5, 5), Arrays.asList(6, 6));
        List expected = Arrays.asList(Arrays.asList(1, 1, 4, 4), Arrays.asList(2, 2, 5, 5), Arrays.asList(3, 3, 6, 6));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, inputData1, 2);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, inputData2, 2);
        JavaDStream unioned = stream1.union(stream2);
        JavaTestUtils.attachTestOutputStream(unioned);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    public static <T> void assertOrderInvariantEquals(List<List<T>> expected, List<List<T>> actual) {
        ArrayList<Set<T>> expectedSets = new ArrayList<Set<T>>();
        for (List<T> list : expected) {
            expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
        }
        ArrayList<Set<T>> actualSets = new ArrayList<Set<T>>();
        for (List<T> list : actual) {
            actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
        }
        Assert.assertEquals(expectedSets, actualSets);
    }

    @Test
    public void testPairFilter() {
        List inputData = Arrays.asList(Arrays.asList("giants", "dodgers"), Arrays.asList("yankees", "red sox"));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"giants", (Object)6)), Arrays.asList(new Tuple2((Object)"yankees", (Object)7)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = stream.mapToPair((PairFunction & Serializable)in -> new Tuple2(in, (Object)in.length()));
        JavaPairDStream filtered = pairStream.filter((Function & Serializable)in -> ((String)in._1()).contains("a"));
        JavaTestUtils.attachTestOutputStream(filtered);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairMap() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)"california"), new Tuple2((Object)3, (Object)"california"), new Tuple2((Object)4, (Object)"new york"), new Tuple2((Object)1, (Object)"new york")), Arrays.asList(new Tuple2((Object)5, (Object)"california"), new Tuple2((Object)5, (Object)"california"), new Tuple2((Object)3, (Object)"new york"), new Tuple2((Object)1, (Object)"new york")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream reversed = pairStream.mapToPair(Tuple2::swap);
        JavaTestUtils.attachTestOutputStream(reversed);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairMapPartitions() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)"california"), new Tuple2((Object)3, (Object)"california"), new Tuple2((Object)4, (Object)"new york"), new Tuple2((Object)1, (Object)"new york")), Arrays.asList(new Tuple2((Object)5, (Object)"california"), new Tuple2((Object)5, (Object)"california"), new Tuple2((Object)3, (Object)"new york"), new Tuple2((Object)1, (Object)"new york")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream reversed = pairStream.mapPartitionsToPair((PairFlatMapFunction & Serializable)in -> {
            LinkedList<Tuple2> out = new LinkedList<Tuple2>();
            while (in.hasNext()) {
                Tuple2 next = (Tuple2)in.next();
                out.add(next.swap());
            }
            return out.iterator();
        });
        JavaTestUtils.attachTestOutputStream(reversed);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairMap2() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(1, 3, 4, 1), Arrays.asList(5, 5, 3, 1));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaDStream reversed = pairStream.map((Function & Serializable)in -> (Integer)in._2());
        JavaTestUtils.attachTestOutputStream(reversed);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairToPairFlatMapWithChangingTypes() {
        List inputData = Arrays.asList(Arrays.asList(new Tuple2((Object)"hi", (Object)1), new Tuple2((Object)"ho", (Object)2)), Arrays.asList(new Tuple2((Object)"hi", (Object)1), new Tuple2((Object)"ho", (Object)2)));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)"h"), new Tuple2((Object)1, (Object)"i"), new Tuple2((Object)2, (Object)"h"), new Tuple2((Object)2, (Object)"o")), Arrays.asList(new Tuple2((Object)1, (Object)"h"), new Tuple2((Object)1, (Object)"i"), new Tuple2((Object)2, (Object)"h"), new Tuple2((Object)2, (Object)"o")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream flatMapped = pairStream.flatMapToPair((PairFlatMapFunction & Serializable)in -> {
            LinkedList<Tuple2> out = new LinkedList<Tuple2>();
            char[] cArray = ((String)in._1()).toCharArray();
            int n = cArray.length;
            for (int i = 0; i < n; ++i) {
                Character s = Character.valueOf(cArray[i]);
                out.add(new Tuple2((Object)((Integer)in._2()), (Object)s.toString()));
            }
            return out.iterator();
        });
        JavaTestUtils.attachTestOutputStream(flatMapped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairGroupByKey() {
        List inputData = this.stringStringKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", Arrays.asList("dodgers", "giants")), new Tuple2((Object)"new york", Arrays.asList("yankees", "mets"))), Arrays.asList(new Tuple2((Object)"california", Arrays.asList("sharks", "ducks")), new Tuple2((Object)"new york", Arrays.asList("rangers", "islanders"))));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream grouped = pairStream.groupByKey();
        JavaTestUtils.attachTestOutputStream(grouped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals((long)expected.size(), (long)result.size());
        Iterator resultItr = result.iterator();
        Iterator<List> expectedItr = expected.iterator();
        while (resultItr.hasNext() && expectedItr.hasNext()) {
            Iterator resultElements = resultItr.next().iterator();
            Iterator expectedElements = expectedItr.next().iterator();
            while (resultElements.hasNext() && expectedElements.hasNext()) {
                Tuple2 resultElement = (Tuple2)resultElements.next();
                Tuple2 expectedElement = (Tuple2)expectedElements.next();
                Assert.assertEquals((Object)expectedElement._1(), (Object)resultElement._1());
                JavaAPISuite.equalIterable((Iterable)expectedElement._2(), (Iterable)resultElement._2());
            }
            Assert.assertEquals((Object)resultElements.hasNext(), (Object)expectedElements.hasNext());
        }
    }

    @Test
    public void testPairReduceByKey() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)4), new Tuple2((Object)"new york", (Object)5)), Arrays.asList(new Tuple2((Object)"california", (Object)10), new Tuple2((Object)"new york", (Object)4)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream reduced = pairStream.reduceByKey((Function2)new IntegerSum());
        JavaTestUtils.attachTestOutputStream(reduced);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testCombineByKey() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)4), new Tuple2((Object)"new york", (Object)5)), Arrays.asList(new Tuple2((Object)"california", (Object)10), new Tuple2((Object)"new york", (Object)4)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream combined = pairStream.combineByKey((Function & Serializable)i -> i, (Function2)new IntegerSum(), (Function2)new IntegerSum(), (Partitioner)new HashPartitioner(2));
        JavaTestUtils.attachTestOutputStream(combined);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testCountByValue() {
        List inputData = Arrays.asList(Arrays.asList("hello", "world"), Arrays.asList("hello", "moon"), Arrays.asList("hello"));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"hello", (Object)1L), new Tuple2((Object)"world", (Object)1L)), Arrays.asList(new Tuple2((Object)"hello", (Object)1L), new Tuple2((Object)"moon", (Object)1L)), Arrays.asList(new Tuple2((Object)"hello", (Object)1L)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream counted = stream.countByValue();
        JavaTestUtils.attachTestOutputStream(counted);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testGroupByKeyAndWindow() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", Arrays.asList(1, 3)), new Tuple2((Object)"new york", Arrays.asList(1, 4))), Arrays.asList(new Tuple2((Object)"california", Arrays.asList(1, 3, 5, 5)), new Tuple2((Object)"new york", Arrays.asList(1, 1, 3, 4))), Arrays.asList(new Tuple2((Object)"california", Arrays.asList(5, 5)), new Tuple2((Object)"new york", Arrays.asList(1, 3))));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000L), new Duration(1000L));
        JavaTestUtils.attachTestOutputStream(groupWindowed);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals((long)expected.size(), (long)result.size());
        for (int i = 0; i < result.size(); ++i) {
            Assert.assertEquals(JavaAPISuite.convert(expected.get(i)), JavaAPISuite.convert(result.get(i)));
        }
    }

    private static Set<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
        ArrayList<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
        for (Tuple2<String, List<Integer>> tuple : listOfTuples) {
            newListOfTuples.add(JavaAPISuite.convert(tuple));
        }
        return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
    }

    private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
        return new Tuple2((Object)((String)tuple._1()), new HashSet((Collection)tuple._2()));
    }

    @Test
    public void testReduceByKeyAndWindow() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)4), new Tuple2((Object)"new york", (Object)5)), Arrays.asList(new Tuple2((Object)"california", (Object)14), new Tuple2((Object)"new york", (Object)9)), Arrays.asList(new Tuple2((Object)"california", (Object)10), new Tuple2((Object)"new york", (Object)4)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream reduceWindowed = pairStream.reduceByKeyAndWindow((Function2)new IntegerSum(), new Duration(2000L), new Duration(1000L));
        JavaTestUtils.attachTestOutputStream(reduceWindowed);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testUpdateStateByKey() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)4), new Tuple2((Object)"new york", (Object)5)), Arrays.asList(new Tuple2((Object)"california", (Object)14), new Tuple2((Object)"new york", (Object)9)), Arrays.asList(new Tuple2((Object)"california", (Object)14), new Tuple2((Object)"new york", (Object)9)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream updated = pairStream.updateStateByKey((Function2 & Serializable)(values, state) -> {
            int out = 0;
            if (state.isPresent()) {
                out += ((Integer)state.get()).intValue();
            }
            for (Integer v : values) {
                out += v.intValue();
            }
            return Optional.of((Object)out);
        });
        JavaTestUtils.attachTestOutputStream(updated);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testUpdateStateByKeyWithInitial() {
        List inputData = this.stringIntKVStream;
        List<Tuple2> initial = Arrays.asList(new Tuple2((Object)"california", (Object)1), new Tuple2((Object)"new york", (Object)2));
        JavaRDD tmpRDD = this.ssc.sparkContext().parallelize(initial);
        JavaPairRDD initialRDD = JavaPairRDD.fromJavaRDD((JavaRDD)tmpRDD);
        List expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)5), new Tuple2((Object)"new york", (Object)7)), Arrays.asList(new Tuple2((Object)"california", (Object)15), new Tuple2((Object)"new york", (Object)11)), Arrays.asList(new Tuple2((Object)"california", (Object)15), new Tuple2((Object)"new york", (Object)11)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream updated = pairStream.updateStateByKey((Function2 & Serializable)(values, state) -> {
            int out = 0;
            if (state.isPresent()) {
                out += ((Integer)state.get()).intValue();
            }
            for (Integer v : values) {
                out += v.intValue();
            }
            return Optional.of((Object)out);
        }, (Partitioner)new HashPartitioner(1), initialRDD);
        JavaTestUtils.attachTestOutputStream(updated);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testReduceByKeyAndWindowWithInverse() {
        List inputData = this.stringIntKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)4), new Tuple2((Object)"new york", (Object)5)), Arrays.asList(new Tuple2((Object)"california", (Object)14), new Tuple2((Object)"new york", (Object)9)), Arrays.asList(new Tuple2((Object)"california", (Object)10), new Tuple2((Object)"new york", (Object)4)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream reduceWindowed = pairStream.reduceByKeyAndWindow((Function2)new IntegerSum(), (Function2)new IntegerDifference(), new Duration(2000L), new Duration(1000L));
        JavaTestUtils.attachTestOutputStream(reduceWindowed);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testCountByValueAndWindow() {
        List inputData = Arrays.asList(Arrays.asList("hello", "world"), Arrays.asList("hello", "moon"), Arrays.asList("hello"));
        List<HashSet> expected = Arrays.asList(Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"hello", (Object)1L), new Tuple2((Object)"world", (Object)1L)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"hello", (Object)2L), new Tuple2((Object)"world", (Object)1L), new Tuple2((Object)"moon", (Object)1L)}), Sets.newHashSet((Object[])new Tuple2[]{new Tuple2((Object)"hello", (Object)2L), new Tuple2((Object)"moon", (Object)1L)}));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream counted = stream.countByValueAndWindow(new Duration(2000L), new Duration(1000L));
        JavaTestUtils.attachTestOutputStream(counted);
        List result = JavaTestUtils.runStreams(this.ssc, 3, 3);
        ArrayList<HashSet> unorderedResult = new ArrayList<HashSet>();
        for (List res : result) {
            unorderedResult.add(Sets.newHashSet(res));
        }
        Assert.assertEquals(expected, unorderedResult);
    }

    @Test
    public void testPairTransform() {
        List inputData = Arrays.asList(Arrays.asList(new Tuple2((Object)3, (Object)5), new Tuple2((Object)1, (Object)5), new Tuple2((Object)4, (Object)5), new Tuple2((Object)2, (Object)5)), Arrays.asList(new Tuple2((Object)2, (Object)5), new Tuple2((Object)3, (Object)5), new Tuple2((Object)4, (Object)5), new Tuple2((Object)1, (Object)5)));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)1, (Object)5), new Tuple2((Object)2, (Object)5), new Tuple2((Object)3, (Object)5), new Tuple2((Object)4, (Object)5)), Arrays.asList(new Tuple2((Object)1, (Object)5), new Tuple2((Object)2, (Object)5), new Tuple2((Object)3, (Object)5), new Tuple2((Object)4, (Object)5)));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream sorted = pairStream.transformToPair((Function & Serializable)in -> in.sortByKey());
        JavaTestUtils.attachTestOutputStream(sorted);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testPairToNormalRDDTransform() {
        List inputData = Arrays.asList(Arrays.asList(new Tuple2((Object)3, (Object)5), new Tuple2((Object)1, (Object)5), new Tuple2((Object)4, (Object)5), new Tuple2((Object)2, (Object)5)), Arrays.asList(new Tuple2((Object)2, (Object)5), new Tuple2((Object)3, (Object)5), new Tuple2((Object)4, (Object)5), new Tuple2((Object)1, (Object)5)));
        List<List> expected = Arrays.asList(Arrays.asList(3, 1, 4, 2), Arrays.asList(2, 3, 4, 1));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaDStream firstParts = pairStream.transform((Function & Serializable)in -> in.map((Function & Serializable)in2 -> (Integer)in2._1()));
        JavaTestUtils.attachTestOutputStream(firstParts);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testMapValues() {
        List inputData = this.stringStringKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"DODGERS"), new Tuple2((Object)"california", (Object)"GIANTS"), new Tuple2((Object)"new york", (Object)"YANKEES"), new Tuple2((Object)"new york", (Object)"METS")), Arrays.asList(new Tuple2((Object)"california", (Object)"SHARKS"), new Tuple2((Object)"california", (Object)"DUCKS"), new Tuple2((Object)"new york", (Object)"RANGERS"), new Tuple2((Object)"new york", (Object)"ISLANDERS")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream mapped = pairStream.mapValues((Function & Serializable)s -> s.toUpperCase(Locale.ROOT));
        JavaTestUtils.attachTestOutputStream(mapped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testFlatMapValues() {
        List inputData = this.stringStringKVStream;
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers1"), new Tuple2((Object)"california", (Object)"dodgers2"), new Tuple2((Object)"california", (Object)"giants1"), new Tuple2((Object)"california", (Object)"giants2"), new Tuple2((Object)"new york", (Object)"yankees1"), new Tuple2((Object)"new york", (Object)"yankees2"), new Tuple2((Object)"new york", (Object)"mets1"), new Tuple2((Object)"new york", (Object)"mets2")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks1"), new Tuple2((Object)"california", (Object)"sharks2"), new Tuple2((Object)"california", (Object)"ducks1"), new Tuple2((Object)"california", (Object)"ducks2"), new Tuple2((Object)"new york", (Object)"rangers1"), new Tuple2((Object)"new york", (Object)"rangers2"), new Tuple2((Object)"new york", (Object)"islanders1"), new Tuple2((Object)"new york", (Object)"islanders2")));
        JavaDStream stream = JavaTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream);
        JavaPairDStream flatMapped = pairStream.flatMapValues((FlatMapFunction & Serializable)in -> {
            ArrayList<String> out = new ArrayList<String>();
            out.add(in + "1");
            out.add(in + "2");
            return out.iterator();
        });
        JavaTestUtils.attachTestOutputStream(flatMapped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testCoGroup() {
        List stringStringKVStream1 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers"), new Tuple2((Object)"new york", (Object)"yankees")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks"), new Tuple2((Object)"new york", (Object)"rangers")));
        List stringStringKVStream2 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"giants"), new Tuple2((Object)"new york", (Object)"mets")), Arrays.asList(new Tuple2((Object)"california", (Object)"ducks"), new Tuple2((Object)"new york", (Object)"islanders")));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)new Tuple2(Arrays.asList("dodgers"), Arrays.asList("giants"))), new Tuple2((Object)"new york", (Object)new Tuple2(Arrays.asList("yankees"), Arrays.asList("mets")))), Arrays.asList(new Tuple2((Object)"california", (Object)new Tuple2(Arrays.asList("sharks"), Arrays.asList("ducks"))), new Tuple2((Object)"new york", (Object)new Tuple2(Arrays.asList("rangers"), Arrays.asList("islanders")))));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream1, 1);
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream2, 1);
        JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
        JavaPairDStream grouped = pairStream1.cogroup(pairStream2);
        JavaTestUtils.attachTestOutputStream(grouped);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals((long)expected.size(), (long)result.size());
        Iterator resultItr = result.iterator();
        Iterator<List> expectedItr = expected.iterator();
        while (resultItr.hasNext() && expectedItr.hasNext()) {
            Iterator resultElements = resultItr.next().iterator();
            Iterator expectedElements = expectedItr.next().iterator();
            while (resultElements.hasNext() && expectedElements.hasNext()) {
                Tuple2 resultElement = (Tuple2)resultElements.next();
                Tuple2 expectedElement = (Tuple2)expectedElements.next();
                Assert.assertEquals((Object)expectedElement._1(), (Object)resultElement._1());
                JavaAPISuite.equalIterable((Iterable)((Tuple2)expectedElement._2())._1(), (Iterable)((Tuple2)resultElement._2())._1());
                JavaAPISuite.equalIterable((Iterable)((Tuple2)expectedElement._2())._2(), (Iterable)((Tuple2)resultElement._2())._2());
            }
            Assert.assertEquals((Object)resultElements.hasNext(), (Object)expectedElements.hasNext());
        }
    }

    @Test
    public void testJoin() {
        List stringStringKVStream1 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers"), new Tuple2((Object)"new york", (Object)"yankees")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks"), new Tuple2((Object)"new york", (Object)"rangers")));
        List stringStringKVStream2 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"giants"), new Tuple2((Object)"new york", (Object)"mets")), Arrays.asList(new Tuple2((Object)"california", (Object)"ducks"), new Tuple2((Object)"new york", (Object)"islanders")));
        List<List> expected = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)new Tuple2((Object)"dodgers", (Object)"giants")), new Tuple2((Object)"new york", (Object)new Tuple2((Object)"yankees", (Object)"mets"))), Arrays.asList(new Tuple2((Object)"california", (Object)new Tuple2((Object)"sharks", (Object)"ducks")), new Tuple2((Object)"new york", (Object)new Tuple2((Object)"rangers", (Object)"islanders"))));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream1, 1);
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream2, 1);
        JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
        JavaPairDStream joined = pairStream1.join(pairStream2);
        JavaTestUtils.attachTestOutputStream(joined);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testLeftOuterJoin() {
        List stringStringKVStream1 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"dodgers"), new Tuple2((Object)"new york", (Object)"yankees")), Arrays.asList(new Tuple2((Object)"california", (Object)"sharks")));
        List stringStringKVStream2 = Arrays.asList(Arrays.asList(new Tuple2((Object)"california", (Object)"giants")), Arrays.asList(new Tuple2((Object)"new york", (Object)"islanders")));
        List<List> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
        JavaDStream stream1 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream1, 1);
        JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
        JavaDStream stream2 = JavaTestUtils.attachTestInputStream(this.ssc, stringStringKVStream2, 1);
        JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
        JavaPairDStream joined = pairStream1.leftOuterJoin(pairStream2);
        JavaDStream counted = joined.count();
        JavaTestUtils.attachTestOutputStream(counted);
        List result = JavaTestUtils.runStreams(this.ssc, 2, 2);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testCheckpointMasterRecovery() throws InterruptedException, IOException {
        List inputData = Arrays.asList(Arrays.asList("this", "is"), Arrays.asList("a", "test"), Arrays.asList("counting", "letters"));
        List expectedInitial = Arrays.asList(Arrays.asList(4, 2));
        List expectedFinal = Arrays.asList(Arrays.asList(1, 4), Arrays.asList(8, 7));
        File tempDir = Utils.createTempDir();
        tempDir.deleteOnExit();
        this.ssc.checkpoint(tempDir.getAbsolutePath());
        JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(this.ssc, inputData, 1);
        JavaDStream letterCount = stream.map(String::length);
        JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
        List initialResult = JavaTestUtils.runStreams(this.ssc, 1, 1);
        JavaAPISuite.assertOrderInvariantEquals(expectedInitial, initialResult);
        Thread.sleep(1000L);
        this.ssc.stop();
        this.ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
        List finalResult = JavaCheckpointTestUtils.runStreams(this.ssc, 2, 3);
        JavaAPISuite.assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
        this.ssc.stop();
        Utils.deleteRecursively((File)tempDir);
    }

    @Test
    public void testContextGetOrCreate() throws IOException {
        this.ssc.stop();
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("test").set("newContext", "true");
        File emptyDir = Utils.createTempDir();
        emptyDir.deleteOnExit();
        StreamingContextSuite contextSuite = new StreamingContextSuite();
        String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
        String checkpointDir = contextSuite.createValidCheckpoint();
        AtomicBoolean newContextCreated = new AtomicBoolean(false);
        Function0 & Serializable creatingFunc = (Function0 & Serializable)() -> {
            newContextCreated.set(true);
            return new JavaStreamingContext(conf, Seconds.apply((long)1L));
        };
        newContextCreated.set(false);
        this.ssc = JavaStreamingContext.getOrCreate((String)emptyDir.getAbsolutePath(), (Function0)creatingFunc);
        Assert.assertTrue((String)"new context not created", (boolean)newContextCreated.get());
        this.ssc.stop();
        newContextCreated.set(false);
        this.ssc = JavaStreamingContext.getOrCreate((String)corruptedCheckpointDir, (Function0)creatingFunc, (Configuration)new Configuration(), (boolean)true);
        Assert.assertTrue((String)"new context not created", (boolean)newContextCreated.get());
        this.ssc.stop();
        newContextCreated.set(false);
        this.ssc = JavaStreamingContext.getOrCreate((String)checkpointDir, (Function0)creatingFunc, (Configuration)new Configuration());
        Assert.assertTrue((String)"old context not recovered", (!newContextCreated.get() ? 1 : 0) != 0);
        this.ssc.stop();
        newContextCreated.set(false);
        JavaSparkContext sc = new JavaSparkContext(conf);
        this.ssc = JavaStreamingContext.getOrCreate((String)checkpointDir, (Function0)creatingFunc, (Configuration)new Configuration());
        Assert.assertTrue((String)"old context not recovered", (!newContextCreated.get() ? 1 : 0) != 0);
        this.ssc.stop();
    }

    @Test
    public void testSocketTextStream() {
        this.ssc.socketTextStream("localhost", 12345);
    }

    @Test
    public void testSocketString() {
        this.ssc.socketStream("localhost", 12345, (Function & Serializable)in -> {
            ArrayList<String> out = new ArrayList<String>();
            try (BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)in, StandardCharsets.UTF_8));){
                String line;
                while ((line = reader.readLine()) != null) {
                    out.add(line);
                }
            }
            return out;
        }, StorageLevel.MEMORY_ONLY());
    }

    @Test
    public void testTextFileStream() throws IOException {
        File testDir = Utils.createTempDir((String)System.getProperty("java.io.tmpdir"), (String)"spark");
        List expected = JavaAPISuite.fileTestPrepare(testDir);
        JavaDStream input = this.ssc.textFileStream(testDir.toString());
        JavaTestUtils.attachTestOutputStream(input);
        List result = JavaTestUtils.runStreams(this.ssc, 1, 1);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testFileStream() throws IOException {
        File testDir = Utils.createTempDir((String)System.getProperty("java.io.tmpdir"), (String)"spark");
        List expected = JavaAPISuite.fileTestPrepare(testDir);
        JavaPairInputDStream inputStream = this.ssc.fileStream(testDir.toString(), LongWritable.class, Text.class, TextInputFormat.class, (Function & Serializable)v1 -> Boolean.TRUE, true);
        JavaDStream test = inputStream.map((Function & Serializable)v1 -> ((Text)v1._2()).toString());
        JavaTestUtils.attachTestOutputStream(test);
        List result = JavaTestUtils.runStreams(this.ssc, 1, 1);
        JavaAPISuite.assertOrderInvariantEquals(expected, result);
    }

    @Test
    public void testRawSocketStream() {
        this.ssc.rawSocketStream("localhost", 12345);
    }

    private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
        File existingFile = new File(testDir, "0");
        Files.write((CharSequence)"0\n", (File)existingFile, (Charset)StandardCharsets.UTF_8);
        Assert.assertTrue((boolean)existingFile.setLastModified(1000L));
        Assert.assertEquals((long)1000L, (long)existingFile.lastModified());
        return Arrays.asList(Arrays.asList("0"));
    }

    private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable, Text> pds) {
        pds.saveAsNewAPIHadoopFiles("", "", LongWritable.class, Text.class, org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
        pds.saveAsHadoopFiles("", "", LongWritable.class, Text.class, SequenceFileOutputFormat.class);
        pds.saveAsNewAPIHadoopFiles("", "", LongWritable.class, Text.class, org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
        pds.saveAsHadoopFiles("", "", LongWritable.class, Text.class, SequenceFileOutputFormat.class);
    }

    private static class IntegerDifference
    implements Function2<Integer, Integer, Integer> {
        private IntegerDifference() {
        }

        public Integer call(Integer i1, Integer i2) {
            return i1 - i2;
        }
    }

    private static class IntegerSum
    implements Function2<Integer, Integer, Integer> {
        private IntegerSum() {
        }

        public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
        }
    }
}

