/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

public class JavaKafkaStreamSuite
implements Serializable {
    private transient JavaStreamingContext ssc = null;
    private transient Random random = new Random();
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.ssc = new JavaStreamingContext(sparkConf, new Duration(500L));
    }

    @After
    public void tearDown() {
        if (this.ssc != null) {
            this.ssc.stop();
            this.ssc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaStream() throws InterruptedException {
        String topic = "topic1";
        HashMap<String, Integer> topics = new HashMap<String, Integer>();
        topics.put(topic, 1);
        HashMap<String, Integer> sent = new HashMap<String, Integer>();
        sent.put("a", 5);
        sent.put("b", 3);
        sent.put("c", 10);
        this.kafkaTestUtils.createTopic(topic);
        this.kafkaTestUtils.sendMessages(topic, sent);
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("zookeeper.connect", this.kafkaTestUtils.zkAddress());
        kafkaParams.put("group.id", "test-consumer-" + this.random.nextInt(10000));
        kafkaParams.put("auto.offset.reset", "smallest");
        JavaPairReceiverInputDStream stream = KafkaUtils.createStream((JavaStreamingContext)this.ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, (StorageLevel)StorageLevel.MEMORY_ONLY_SER());
        final Map result = Collections.synchronizedMap(new HashMap());
        JavaDStream words = stream.map((Function)new Function<Tuple2<String, String>, String>(){

            public String call(Tuple2<String, String> tuple2) throws Exception {
                return (String)tuple2._2();
            }
        });
        words.countByValue().foreachRDD((Function)new Function<JavaPairRDD<String, Long>, Void>(){

            public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
                List ret = rdd.collect();
                for (Tuple2 r : ret) {
                    if (result.containsKey(r._1())) {
                        result.put(r._1(), (Long)result.get(r._1()) + (Long)r._2());
                        continue;
                    }
                    result.put(r._1(), r._2());
                }
                return null;
            }
        });
        this.ssc.start();
        long startTime = System.currentTimeMillis();
        boolean sizeMatches = false;
        while (!sizeMatches && System.currentTimeMillis() - startTime < 20000L) {
            sizeMatches = sent.size() == result.size();
            Thread.sleep(200L);
        }
        Assert.assertEquals((long)sent.size(), (long)result.size());
        for (String k : sent.keySet()) {
            Assert.assertEquals((long)((Integer)sent.get(k)).intValue(), (long)((Long)result.get(k)).intValue());
        }
    }
}

