package org.apache.spark.streaming;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.spark_project.guava.io.Closeables;

/* loaded from: input_file:org/apache/spark/streaming/JavaReceiverAPISuite.class */
public class JavaReceiverAPISuite implements Serializable {

    /* loaded from: input_file:org/apache/spark/streaming/JavaReceiverAPISuite$JavaSocketReceiver.class */
    private static class JavaSocketReceiver extends Receiver<String> {
        private String host;
        private int port;

        JavaSocketReceiver(String str, int i) {
            super(StorageLevel.MEMORY_AND_DISK());
            this.host = null;
            this.port = -1;
            this.host = str;
            this.port = i;
        }

        /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.spark.streaming.JavaReceiverAPISuite$JavaSocketReceiver$1] */
        public void onStart() {
            new Thread() { // from class: org.apache.spark.streaming.JavaReceiverAPISuite.JavaSocketReceiver.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    JavaSocketReceiver.this.receive();
                }
            }.start();
        }

        public void onStop() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void receive() {
            Socket socket = null;
            BufferedReader bufferedReader = null;
            try {
                try {
                    socket = new Socket(this.host, this.port);
                    bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
                    while (true) {
                        Object readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            Closeables.close(bufferedReader, true);
                            Closeables.close(socket, true);
                            return;
                        }
                        store(readLine);
                    }
                } catch (Throwable th) {
                    Closeables.close(bufferedReader, true);
                    Closeables.close(socket, true);
                    throw th;
                }
            } catch (ConnectException e) {
                e.printStackTrace();
                restart("Could not connect", e);
            } catch (Throwable th2) {
                th2.printStackTrace();
                restart("Error receiving data", th2);
            }
        }
    }

    @Before
    public void setUp() {
        System.clearProperty("spark.streaming.clock");
    }

    @After
    public void tearDown() {
        System.clearProperty("spark.streaming.clock");
    }

    @Test
    public void testReceiver() throws InterruptedException {
        TestServer testServer = new TestServer(0);
        testServer.start();
        final AtomicLong atomicLong = new AtomicLong(0L);
        try {
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[2]", "test", new Duration(200L));
            javaStreamingContext.receiverStream(new JavaSocketReceiver("localhost", testServer.port())).map(new Function<String, String>() { // from class: org.apache.spark.streaming.JavaReceiverAPISuite.1
                public String call(String str) {
                    return str + ".";
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() { // from class: org.apache.spark.streaming.JavaReceiverAPISuite.2
                public void call(JavaRDD<String> javaRDD) {
                    atomicLong.addAndGet(javaRDD.count());
                }
            });
            javaStreamingContext.start();
            long currentTimeMillis = System.currentTimeMillis();
            Thread.sleep(200L);
            for (int i = 0; i < 6; i++) {
                testServer.send(i + "\n");
                Thread.sleep(100L);
            }
            while (atomicLong.get() == 0 && System.currentTimeMillis() - currentTimeMillis < 10000) {
                Thread.sleep(100L);
            }
            javaStreamingContext.stop();
            Assert.assertTrue(atomicLong.get() > 0);
            testServer.stop();
        } catch (Throwable th) {
            testServer.stop();
            throw th;
        }
    }
}
