package org.apache.spark.streaming.kafka.v09;

import java.io.Serializable;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/streaming/kafka/v09/JavaKafkaRDDSuite.class */
public class JavaKafkaRDDSuite implements Serializable {
    private transient JavaSparkContext sc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.sc = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()));
    }

    @After
    public void tearDown() {
        if (this.sc != null) {
            this.sc.stop();
            this.sc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaRDD() throws InterruptedException {
        createTopicAndSendData("topic1_testKafkaRDD");
        createTopicAndSendData("topic2_testKafkaRDD");
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaTestUtils.brokerAddress());
        hashMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap.put("spark.kafka.poll.time", "1000");
        OffsetRange[] offsetRangeArr = {OffsetRange.create("topic1_testKafkaRDD", 0, 0L, 1L), OffsetRange.create("topic2_testKafkaRDD", 0, 0L, 1L)};
        JavaRDD map = KafkaUtils.createRDD(this.sc, String.class, String.class, hashMap, offsetRangeArr).map(new Function<Tuple2<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.v09.JavaKafkaRDDSuite.1
            public String call(Tuple2<String, String> tuple2) {
                return (String) tuple2._2();
            }
        });
        JavaRDD createRDD = KafkaUtils.createRDD(this.sc, String.class, String.class, String.class, hashMap, offsetRangeArr, new Function<ConsumerRecord<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.v09.JavaKafkaRDDSuite.2
            public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                return (String) consumerRecord.value();
            }
        });
        JavaRDD createRDD2 = KafkaUtils.createRDD(this.sc, String.class, String.class, String.class, hashMap, offsetRangeArr, new Function<ConsumerRecord<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.v09.JavaKafkaRDDSuite.3
            public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                return (String) consumerRecord.value();
            }
        });
        long count = map.count();
        long count2 = createRDD.count();
        long count3 = createRDD2.count();
        Assert.assertTrue(count > 0);
        Assert.assertEquals(count, count2);
        Assert.assertEquals(count, count3);
    }

    private String[] createTopicAndSendData(String str) {
        String[] strArr = {str + "-1", str + "-2", str + "-3"};
        this.kafkaTestUtils.createTopic(str);
        this.kafkaTestUtils.sendMessages(str, strArr);
        return strArr;
    }
}
