/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.kafka.Broker;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

public class JavaKafkaRDDSuite
implements Serializable {
    private transient JavaSparkContext sc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.sc = new JavaSparkContext(sparkConf);
    }

    @After
    public void tearDown() {
        if (this.sc != null) {
            this.sc.stop();
            this.sc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaRDD() throws InterruptedException {
        String topic1 = "topic1";
        String topic2 = "topic2";
        this.createTopicAndSendData(topic1);
        this.createTopicAndSendData(topic2);
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", this.kafkaTestUtils.brokerAddress());
        OffsetRange[] offsetRanges = new OffsetRange[]{OffsetRange.create((String)topic1, (int)0, (long)0L, (long)1L), OffsetRange.create((String)topic2, (int)0, (long)0L, (long)1L)};
        HashMap emptyLeaders = new HashMap();
        HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
        String[] hostAndPort = this.kafkaTestUtils.brokerAddress().split(":");
        Broker broker = Broker.create((String)hostAndPort[0], (int)Integer.parseInt(hostAndPort[1]));
        leaders.put(new TopicAndPartition(topic1, 0), broker);
        leaders.put(new TopicAndPartition(topic2, 0), broker);
        JavaRDD rdd1 = KafkaUtils.createRDD((JavaSparkContext)this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, (OffsetRange[])offsetRanges).map((Function)new Function<Tuple2<String, String>, String>(){

            public String call(Tuple2<String, String> kv) {
                return (String)kv._2();
            }
        });
        JavaRDD rdd2 = KafkaUtils.createRDD((JavaSparkContext)this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, (OffsetRange[])offsetRanges, emptyLeaders, (Function)new Function<MessageAndMetadata<String, String>, String>(){

            public String call(MessageAndMetadata<String, String> msgAndMd) {
                return (String)msgAndMd.message();
            }
        });
        JavaRDD rdd3 = KafkaUtils.createRDD((JavaSparkContext)this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, (OffsetRange[])offsetRanges, leaders, (Function)new Function<MessageAndMetadata<String, String>, String>(){

            public String call(MessageAndMetadata<String, String> msgAndMd) {
                return (String)msgAndMd.message();
            }
        });
        long count1 = rdd1.count();
        long count2 = rdd2.count();
        long count3 = rdd3.count();
        Assert.assertTrue((count1 > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)count1, (long)count2);
        Assert.assertEquals((long)count1, (long)count3);
    }

    private String[] createTopicAndSendData(String topic) {
        String[] data = new String[]{topic + "-1", topic + "-2", topic + "-3"};
        this.kafkaTestUtils.createTopic(topic);
        this.kafkaTestUtils.sendMessages(topic, data);
        return data;
    }
}

