/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.io.File;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.BatchCounter;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestReceiver;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.streaming.scheduler.JobGeneratorSuite$;
import org.apache.spark.streaming.scheduler.ReceiverTracker;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Tag;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.collection.Seq;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\r3A\u0001C\u0005\u0001)!)Q\u0004\u0001C\u0001=\u001d)\u0011%\u0003E\u0001E\u0019)\u0001\"\u0003E\u0001G!)Qd\u0001C\u0001[!9af\u0001b\u0001\n\u0003y\u0003B\u0002\u001e\u0004A\u0003%\u0001\u0007C\u0004<\u0007\u0005\u0005I\u0011\u0002\u001f\u0003#){'mR3oKJ\fGo\u001c:Tk&$XM\u0003\u0002\u000b\u0017\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u00195\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u00059y\u0011!B:qCJ\\'B\u0001\t\u0012\u0003\u0019\t\u0007/Y2iK*\t!#A\u0002pe\u001e\u001c\u0001aE\u0002\u0001+e\u0001\"AF\f\u000e\u00035I!\u0001G\u0007\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\tQ2$D\u0001\f\u0013\ta2BA\u0007UKN$8+^5uK\n\u000b7/Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003}\u0001\"\u0001\t\u0001\u000e\u0003%\t\u0011CS8c\u000f\u0016tWM]1u_J\u001cV/\u001b;f!\t\u00013aE\u0002\u0004I)\u0002\"!\n\u0015\u000e\u0003\u0019R\u0011aJ\u0001\u0006g\u000e\fG.Y\u0005\u0003S\u0019\u0012a!\u00118z%\u00164\u0007CA\u0013,\u0013\tacE\u0001\u0007TKJL\u0017\r\\5{C\ndW\rF\u0001#\u0003%9\u0018-\u001b;MCR\u001c\u0007.F\u00011!\t\t\u0004(D\u00013\u0015\t\u0019D'\u0001\u0006d_:\u001cWO\u001d:f]RT!!\u000e\u001c\u0002\tU$\u0018\u000e\u001c\u0006\u0002o\u0005!!.\u0019<b\u0013\tI$G\u0001\bD_VtG\u000fR8x]2\u000bGo\u00195\u0002\u0015]\f\u0017\u000e\u001e'bi\u000eD\u0007%A\u0006sK\u0006$'+Z:pYZ,G#A\u001f\u0011\u0005y\nU\"A \u000b\u0005\u00013\u0014\u0001\u00027b]\u001eL!AQ \u0003\r=\u0013'.Z2u\u0001")
public class JobGeneratorSuite
extends SparkFunSuite
implements TestSuiteBase {
    private String checkpointDir;
    private final SparkConf conf;
    private final PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    public static CountDownLatch waitLatch() {
        return JobGeneratorSuite$.MODULE$.waitLatch();
    }

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$((BeforeAndAfterEach)this);
    }

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    @Override
    public String framework() {
        return TestSuiteBase.framework$(this);
    }

    @Override
    public String master() {
        return TestSuiteBase.master$(this);
    }

    @Override
    public Duration batchDuration() {
        return TestSuiteBase.batchDuration$(this);
    }

    @Override
    public int numInputPartitions() {
        return TestSuiteBase.numInputPartitions$(this);
    }

    @Override
    public int maxWaitTimeMillis() {
        return TestSuiteBase.maxWaitTimeMillis$(this);
    }

    @Override
    public boolean useManualClock() {
        return TestSuiteBase.useManualClock$(this);
    }

    @Override
    public boolean actuallyWait() {
        return TestSuiteBase.actuallyWait$(this);
    }

    @Override
    public void beforeFunction() {
        TestSuiteBase.beforeFunction$(this);
    }

    @Override
    public void afterFunction() {
        TestSuiteBase.afterFunction$(this);
    }

    @Override
    public void beforeEach() {
        TestSuiteBase.beforeEach$(this);
    }

    @Override
    public void afterEach() {
        TestSuiteBase.afterEach$(this);
    }

    @Override
    public <R> R withStreamingContext(StreamingContext ssc, Function1<StreamingContext, R> block) {
        return (R)TestSuiteBase.withStreamingContext$(this, ssc, block);
    }

    @Override
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> block) {
        return (R)TestSuiteBase.withTestServer$(this, testServer, block);
    }

    @Override
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, int numPartitions, ClassTag<U> evidence$4, ClassTag<V> evidence$5) {
        return TestSuiteBase.setupStreams$(this, input, operation, numPartitions, evidence$4, evidence$5);
    }

    @Override
    public <U, V> int setupStreams$default$3() {
        return TestSuiteBase.setupStreams$default$3$(this);
    }

    @Override
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, ClassTag<U> evidence$6, ClassTag<V> evidence$7, ClassTag<W> evidence$8) {
        return TestSuiteBase.setupStreams$(this, input1, input2, operation, evidence$6, evidence$7, evidence$8);
    }

    @Override
    public <V> Seq<Seq<V>> runStreams(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$9) {
        return TestSuiteBase.runStreams$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$9);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        return TestSuiteBase.runStreams$default$4$(this);
    }

    @Override
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$10) {
        return TestSuiteBase.runStreamsWithPartitions$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$10);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        return TestSuiteBase.runStreamsWithPartitions$default$4$(this);
    }

    @Override
    public <V> void verifyOutput(Seq<Seq<V>> output, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<V> evidence$11) {
        TestSuiteBase.verifyOutput$(this, output, expectedOutput, useSet, evidence$11);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<U> evidence$12, ClassTag<V> evidence$13) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, useSet, evidence$12, evidence$13);
    }

    @Override
    public <U, V> boolean testOperation$default$4() {
        return TestSuiteBase.testOperation$default$4$(this);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$14, ClassTag<V> evidence$15) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, numBatches, useSet, evidence$14, evidence$15);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, boolean useSet, ClassTag<U> evidence$16, ClassTag<V> evidence$17, ClassTag<W> evidence$18) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, useSet, evidence$16, evidence$17, evidence$18);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$19, ClassTag<V> evidence$20, ClassTag<W> evidence$21) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, numBatches, useSet, evidence$19, evidence$20, evidence$21);
    }

    private String checkpointDir$lzycompute() {
        JobGeneratorSuite jobGeneratorSuite = this;
        synchronized (jobGeneratorSuite) {
            if (!this.bitmap$0) {
                this.checkpointDir = TestSuiteBase.checkpointDir$(this);
                this.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override
    public String checkpointDir() {
        if (!this.bitmap$0) {
            return this.checkpointDir$lzycompute();
        }
        return this.checkpointDir;
    }

    @Override
    public SparkConf conf() {
        return this.conf;
    }

    @Override
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf x$1) {
        this.conf = x$1;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout x$1) {
        this.eventuallyTimeout = x$1;
    }

    public static final /* synthetic */ void $anonfun$new$3(long longBatchTime$1, RDD x$1, Time time) {
        if (time.milliseconds() == longBatchTime$1) {
            while (JobGeneratorSuite$.MODULE$.waitLatch().getCount() > 0L) {
                JobGeneratorSuite$.MODULE$.waitLatch().await();
            }
            return;
        }
    }

    private static final Seq getBlocksOfBatch$1(long batchTime, ReceiverTracker receiverTracker$1, ReceiverInputDStream inputStream$1) {
        return receiverTracker$1.getBlocksOfBatchAndStream(new Time(batchTime), inputStream$1.id());
    }

    private static final void waitForNewReceivedBlocks$1(PatienceConfiguration.Timeout testTimeout$1, ReceiverTracker receiverTracker$1) {
        Eventually$.MODULE$.eventually(testTimeout$1, (Function0 & Serializable & scala.Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(receiverTracker$1.hasUnallocatedBlocks(), "receiverTracker.hasUnallocatedBlocks", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 98));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
    }

    private static final void waitForBlocksToBeAllocatedToBatch$1(long batchTime, PatienceConfiguration.Timeout testTimeout$1, ReceiverTracker receiverTracker$1, ReceiverInputDStream inputStream$1) {
        Eventually$.MODULE$.eventually(testTimeout$1, (Function0 & Serializable & scala.Serializable)() -> {
            Seq $org_scalatest_assert_macro_left = JobGeneratorSuite.getBlocksOfBatch$1(batchTime, receiverTracker$1, inputStream$1);
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "nonEmpty", $org_scalatest_assert_macro_left.nonEmpty(), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 105));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
    }

    public static final /* synthetic */ void $anonfun$new$2(JobGeneratorSuite $this, File checkpointDir$1, StreamingContext ssc) {
        ManualClock clock = (ManualClock)ssc.scheduler().clock();
        int numBatches = 10;
        int longBatchNumber = 3;
        long longBatchTime = (long)longBatchNumber * $this.batchDuration().milliseconds();
        PatienceConfiguration.Timeout testTimeout = Eventually$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan((scala.concurrent.duration.Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds()));
        ReceiverInputDStream inputStream = ssc.receiverStream((Receiver)new TestReceiver(), ClassTag$.MODULE$.Int());
        inputStream.foreachRDD((Function2 & Serializable & scala.Serializable)(x$1, time) -> {
            JobGeneratorSuite.$anonfun$new$3(longBatchTime, x$1, time);
            return BoxedUnit.UNIT;
        });
        BatchCounter batchCounter = new BatchCounter(ssc);
        ssc.checkpoint(checkpointDir$1.getAbsolutePath());
        ssc.start();
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(inputStream.rememberDuration());
        Duration $org_scalatest_assert_macro_right = $this.batchDuration();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 87));
        ReceiverTracker receiverTracker = ssc.scheduler().receiverTracker();
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)batchNum -> {
            JobGeneratorSuite.waitForNewReceivedBlocks$1(testTimeout, receiverTracker);
            clock.advance($this.batchDuration().milliseconds());
            JobGeneratorSuite.waitForBlocksToBeAllocatedToBatch$1(clock.getTimeMillis(), testTimeout, receiverTracker, inputStream);
        });
        Eventually$.MODULE$.eventually(testTimeout, (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> ssc.scheduler().getPendingTimes().contains((Object)new Time((long)numBatches * $this.batchDuration().milliseconds())), Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 117));
        Seq $org_scalatest_assert_macro_left2 = JobGeneratorSuite.getBlocksOfBatch$1(longBatchTime, receiverTracker, inputStream);
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left2, "nonEmpty", $org_scalatest_assert_macro_left2.nonEmpty(), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"blocks of incomplete batch already deleted", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 122));
        int $org_scalatest_assert_macro_left3 = batchCounter.getNumCompletedBatches();
        int $org_scalatest_assert_macro_right2 = longBatchNumber;
        Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left3), "<", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left3 < $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 123));
        JobGeneratorSuite$.MODULE$.waitLatch().countDown();
        ssc.stop(ssc.stop$default$1());
    }

    public JobGeneratorSuite() {
        TestSuiteBase.$init$(this);
        this.test("SPARK-6222: Do not clear received block data too soon", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            File checkpointDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
            SparkConf testConf = this.conf();
            testConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
            testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1");
            this.withStreamingContext(new StreamingContext(testConf, this.batchDuration()), (Function1 & Serializable & scala.Serializable)ssc -> {
                JobGeneratorSuite.$anonfun$new$2(this, checkpointDir, ssc);
                return BoxedUnit.UNIT;
            });
        }, new Position("JobGeneratorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 58));
    }
}

