/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Random;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JavaKafkaRDDSuite
implements Serializable {
    private transient JavaSparkContext sc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.sc = new JavaSparkContext(sparkConf);
    }

    @After
    public void tearDown() {
        if (this.sc != null) {
            this.sc.stop();
            this.sc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaRDD() throws InterruptedException {
        String topic1 = "topic1";
        String topic2 = "topic2";
        Random random = new Random();
        this.createTopicAndSendData(topic1);
        this.createTopicAndSendData(topic2);
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", this.kafkaTestUtils.brokerAddress());
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        OffsetRange[] offsetRanges = new OffsetRange[]{OffsetRange.create((String)topic1, (int)0, (long)0L, (long)1L), OffsetRange.create((String)topic2, (int)0, (long)0L, (long)1L)};
        HashMap<TopicPartition, String> leaders = new HashMap<TopicPartition, String>();
        String[] hostAndPort = this.kafkaTestUtils.brokerAddress().split(":");
        String broker = hostAndPort[0];
        leaders.put(offsetRanges[0].topicPartition(), broker);
        leaders.put(offsetRanges[1].topicPartition(), broker);
        Function<ConsumerRecord<String, String>, String> handler = new Function<ConsumerRecord<String, String>, String>(){

            public String call(ConsumerRecord<String, String> r) {
                return (String)r.value();
            }
        };
        JavaRDD rdd1 = KafkaUtils.createRDD((JavaSparkContext)this.sc, kafkaParams, (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferFixed(leaders)).map((Function)handler);
        JavaRDD rdd2 = KafkaUtils.createRDD((JavaSparkContext)this.sc, kafkaParams, (OffsetRange[])offsetRanges, (LocationStrategy)LocationStrategies.PreferConsistent()).map((Function)handler);
        long count1 = rdd1.count();
        long count2 = rdd2.count();
        Assert.assertTrue((count1 > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)count1, (long)count2);
    }

    private String[] createTopicAndSendData(String topic) {
        String[] data = new String[]{topic + "-1", topic + "-2", topic + "-3"};
        this.kafkaTestUtils.createTopic(topic);
        this.kafkaTestUtils.sendMessages(topic, data);
        return data;
    }
}

