package org.apache.spark.streaming;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.spark.util.ManualClock;
import scala.Serializable;
import scala.math.Numeric$LongIsIntegral$;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: InputStreamsSuite.scala */
/* loaded from: input_file:org/apache/spark/streaming/InputStreamsSuite$$anonfun$7$$anonfun$apply$mcV$sp$5.class */
public final class InputStreamsSuite$$anonfun$7$$anonfun$apply$mcV$sp$5 extends AbstractFunction1<StreamingContext, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ InputStreamsSuite$$anonfun$7 $outer;
    private final int numTotalRecords$1;
    private final MultiThreadTestReceiver testReceiver$1;
    private final ConcurrentLinkedQueue outputQueue$2;

    public final void apply(StreamingContext streamingContext) {
        new TestOutputStream(streamingContext.receiverStream(this.testReceiver$1, ClassTag$.MODULE$.Int()).count(), this.outputQueue$2, ClassTag$.MODULE$.Long()).register();
        streamingContext.start();
        ManualClock clock = streamingContext.scheduler().clock();
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if ((!MultiThreadTestReceiver$.MODULE$.haveAllThreadsFinished() || BoxesRunTime.unboxToLong(this.$outer.org$apache$spark$streaming$InputStreamsSuite$$anonfun$$output$2(this.outputQueue$2).sum(Numeric$LongIsIntegral$.MODULE$)) < this.numTotalRecords$1) && System.currentTimeMillis() - currentTimeMillis < 5000) {
                Thread.sleep(100L);
                clock.advance(this.$outer.org$apache$spark$streaming$InputStreamsSuite$$anonfun$$$outer().batchDuration().milliseconds());
            }
        }
        Thread.sleep(1000L);
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((StreamingContext) obj);
        return BoxedUnit.UNIT;
    }

    public InputStreamsSuite$$anonfun$7$$anonfun$apply$mcV$sp$5(InputStreamsSuite$$anonfun$7 inputStreamsSuite$$anonfun$7, int i, MultiThreadTestReceiver multiThreadTestReceiver, ConcurrentLinkedQueue concurrentLinkedQueue) {
        if (inputStreamsSuite$$anonfun$7 == null) {
            throw null;
        }
        this.$outer = inputStreamsSuite$$anonfun$7;
        this.numTotalRecords$1 = i;
        this.testReceiver$1 = multiThreadTestReceiver;
        this.outputQueue$2 = concurrentLinkedQueue;
    }
}
