package org.apache.spark.streaming;

import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.GenericTraversableTemplate;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Queue;
import scala.math.Numeric$LongIsIntegral$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: InputStreamsSuite.scala */
@ScalaSignature(bytes = "\u0006\u000112Aa\u0001\u0003\u0001\u001b!)A\u0004\u0001C\u0001;!)q\u0004\u0001C\u0001A\t\t\u0012J\u001c9viN#(/Z1ngN+\u0018\u000e^3\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0003ta\u0006\u00148N\u0003\u0002\n\u0015\u00051\u0011\r]1dQ\u0016T\u0011aC\u0001\u0004_J<7\u0001A\n\u0005\u00019\u0011b\u0003\u0005\u0002\u0010!5\ta!\u0003\u0002\u0012\r\ti1\u000b]1sW\u001a+hnU;ji\u0016\u0004\"a\u0005\u000b\u000e\u0003\u0011I!!\u0006\u0003\u0003\u001bQ+7\u000f^*vSR,')Y:f!\t9\"$D\u0001\u0019\u0015\tI\"\"A\u0005tG\u0006d\u0017\r^3ti&\u00111\u0004\u0007\u0002\u000f\u0005\u00164wN]3B]\u0012\fe\r^3s\u0003\u0019a\u0014N\\5u}Q\ta\u0004\u0005\u0002\u0014\u0001\u0005qA/Z:u\r&dWm\u0015;sK\u0006lGCA\u0011(!\t\u0011S%D\u0001$\u0015\u0005!\u0013!B:dC2\f\u0017B\u0001\u0014$\u0005\u0011)f.\u001b;\t\u000b!\u0012\u0001\u0019A\u0015\u0002\u00199,wOR5mKN|e\u000e\\=\u0011\u0005\tR\u0013BA\u0016$\u0005\u001d\u0011un\u001c7fC:\u0004")
/* loaded from: input_file:org/apache/spark/streaming/InputStreamsSuite.class */
public class InputStreamsSuite extends SparkFunSuite implements TestSuiteBase, BeforeAndAfter {
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private String checkpointDir;
    private final SparkConf conf;
    private final PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String str, Args args) {
        return BeforeAndAfterEach.runTest$(this, str, args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option option, Args args) {
        return BeforeAndAfterAll.run$(this, option, args);
    }

    public void before(Function0<Object> function0, Position position) {
        BeforeAndAfter.before$(this, function0, position);
    }

    public void after(Function0<Object> function0, Position position) {
        BeforeAndAfter.after$(this, function0, position);
    }

    public Status runTest(String str, Args args) {
        return BeforeAndAfter.runTest$(this, str, args);
    }

    public Status run(Option<String> option, Args args) {
        return BeforeAndAfter.run$(this, option, args);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$(this);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String framework() {
        String framework;
        framework = framework();
        return framework;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String master() {
        String master;
        master = master();
        return master;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public Duration batchDuration() {
        Duration batchDuration;
        batchDuration = batchDuration();
        return batchDuration;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int numInputPartitions() {
        int numInputPartitions;
        numInputPartitions = numInputPartitions();
        return numInputPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public int maxWaitTimeMillis() {
        int maxWaitTimeMillis;
        maxWaitTimeMillis = maxWaitTimeMillis();
        return maxWaitTimeMillis;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean useManualClock() {
        boolean useManualClock;
        useManualClock = useManualClock();
        return useManualClock;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public boolean actuallyWait() {
        boolean actuallyWait;
        actuallyWait = actuallyWait();
        return actuallyWait;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeFunction() {
        beforeFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterFunction() {
        afterFunction();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void beforeEach() {
        beforeEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void afterEach() {
        afterEach();
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withStreamingContext(StreamingContext streamingContext, Function1<StreamingContext, R> function1) {
        Object withStreamingContext;
        withStreamingContext = withStreamingContext(streamingContext, function1);
        return (R) withStreamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> function1) {
        Object withTestServer;
        withTestServer = withTestServer(testServer, function1);
        return (R) withTestServer;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, int i, ClassTag<U> classTag, ClassTag<V> classTag2) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, function1, i, classTag, classTag2);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> int setupStreams$default$3() {
        int i;
        i = setupStreams$default$3();
        return i;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        StreamingContext streamingContext;
        streamingContext = setupStreams(seq, seq2, function2, classTag, classTag2, classTag3);
        return streamingContext;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<V>> runStreams(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<V>> runStreams;
        runStreams = runStreams(streamingContext, i, i2, function0, classTag);
        return runStreams;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        Function0<BoxedUnit> runStreams$default$4;
        runStreams$default$4 = runStreams$default$4();
        return runStreams$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext streamingContext, int i, int i2, Function0<BoxedUnit> function0, ClassTag<V> classTag) {
        Seq<Seq<Seq<V>>> runStreamsWithPartitions;
        runStreamsWithPartitions = runStreamsWithPartitions(streamingContext, i, i2, function0, classTag);
        return runStreamsWithPartitions;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        Function0<BoxedUnit> runStreamsWithPartitions$default$4;
        runStreamsWithPartitions$default$4 = runStreamsWithPartitions$default$4();
        return runStreamsWithPartitions$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <V> void verifyOutput(Seq<Seq<V>> seq, Seq<Seq<V>> seq2, boolean z, ClassTag<V> classTag) {
        verifyOutput(seq, seq2, z, classTag);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> boolean testOperation$default$4() {
        boolean testOperation$default$4;
        testOperation$default$4 = testOperation$default$4();
        return testOperation$default$4;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V> void testOperation(Seq<Seq<U>> seq, Function1<DStream<U>, DStream<V>> function1, Seq<Seq<V>> seq2, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2) {
        testOperation(seq, function1, seq2, i, z, classTag, classTag2);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, z, classTag, classTag2, classTag3);
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public <U, V, W> void testOperation(Seq<Seq<U>> seq, Seq<Seq<V>> seq2, Function2<DStream<U>, DStream<V>, DStream<W>> function2, Seq<Seq<W>> seq3, int i, boolean z, ClassTag<U> classTag, ClassTag<V> classTag2, ClassTag<W> classTag3) {
        testOperation(seq, seq2, function2, seq3, i, z, classTag, classTag2, classTag3);
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean z) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = z;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> atomicReference) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = atomicReference;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> atomicReference) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = atomicReference;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.streaming.InputStreamsSuite] */
    private String checkpointDir$lzycompute() {
        String checkpointDir;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                checkpointDir = checkpointDir();
                this.checkpointDir = checkpointDir;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public String checkpointDir() {
        return !this.bitmap$0 ? checkpointDir$lzycompute() : this.checkpointDir;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public SparkConf conf() {
        return this.conf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf sparkConf) {
        this.conf = sparkConf;
    }

    @Override // org.apache.spark.streaming.TestSuiteBase
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout timeout) {
        this.eventuallyTimeout = timeout;
    }

    public void testFileStream(boolean z) {
        withTempDir(file -> {
            $anonfun$testFileStream$1(this, z, file);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$10(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    public static final /* synthetic */ void $anonfun$new$14(InputStreamsSuite inputStreamsSuite, String str) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(new StringOps(Predef$.MODULE$.augmentString(str)).mkString(",")).append("]").toString();
        });
    }

    public static final /* synthetic */ Assertion $anonfun$new$17(InputStreamsSuite inputStreamsSuite, String[] strArr, Seq seq, int i) {
        TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(strArr[i]);
        String str = (String) seq.apply(i);
        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", str, convertToEqualizer.$eq$eq$eq(str, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
    }

    public static final /* synthetic */ void $anonfun$new$21(InputStreamsSuite inputStreamsSuite, TestServer testServer, StreamingContext streamingContext) {
        BatchCounter batchCounter = new BatchCounter(streamingContext);
        ReceiverInputDStream socketTextStream = streamingContext.socketTextStream("localhost", testServer.port(), StorageLevel$.MODULE$.MEMORY_AND_DISK());
        new TestOutputStream(socketTextStream, new ConcurrentLinkedQueue(), ClassTag$.MODULE$.apply(String.class)).register();
        streamingContext.start();
        streamingContext.scheduler().clock().advance(inputStreamsSuite.batchDuration().milliseconds());
        if (!batchCounter.waitUntilBatchesCompleted(1, 30000L)) {
            throw inputStreamsSuite.fail("Timeout: cannot finish all batches in 30 seconds", new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 120));
        }
        socketTextStream.generatedRDDs().foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(((RDD) tuple2._2()) instanceof WriteAheadLogBackedBlockRDD, "rdd.isInstanceOf[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD[_]]", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 124));
        });
    }

    public static final /* synthetic */ void $anonfun$new$20(InputStreamsSuite inputStreamsSuite, TestServer testServer) {
        testServer.start();
        inputStreamsSuite.withStreamingContext(new StreamingContext(inputStreamsSuite.conf(), inputStreamsSuite.batchDuration()), streamingContext -> {
            $anonfun$new$21(inputStreamsSuite, testServer, streamingContext);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ byte $anonfun$new$28(int i, byte b) {
        return (byte) (b + i);
    }

    public static final /* synthetic */ Assertion $anonfun$new$27(InputStreamsSuite inputStreamsSuite, Duration duration, File file, byte[] bArr, ManualClock manualClock, BatchCounter batchCounter, int i) {
        Thread.sleep(duration.milliseconds());
        File file2 = new File(file, Integer.toString(i));
        Files.write((byte[]) new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bArr)).map(obj -> {
            return BoxesRunTime.boxToByte($anonfun$new$28(i, BoxesRunTime.unboxToByte(obj)));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte())), file2);
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(file2.setLastModified(manualClock.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 160));
        TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToLong(file2.lastModified()));
        long timeMillis = manualClock.getTimeMillis();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToLong(timeMillis), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToLong(timeMillis), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 161));
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(13).append("Created file ").append(file2).toString();
        });
        manualClock.advance(duration.milliseconds());
        return (Assertion) Eventually$.MODULE$.eventually(inputStreamsSuite.eventuallyTimeout(), () -> {
            TripleEqualsSupport.Equalizer convertToEqualizer2 = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToInteger(batchCounter.getNumCompletedBatches()));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(i), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(i), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 167));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 166));
    }

    public static final /* synthetic */ byte[] $anonfun$new$32(byte[] bArr) {
        return Predef$.MODULE$.byteArrayOps(bArr);
    }

    public static final /* synthetic */ byte $anonfun$new$34(int i, byte b) {
        return (byte) (b + i);
    }

    public static final /* synthetic */ Assertion $anonfun$new$33(InputStreamsSuite inputStreamsSuite, Seq seq, byte[] bArr, int i) {
        TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(seq.apply(i));
        byte[] bArr2 = (byte[]) new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bArr)).map(obj -> {
            return BoxesRunTime.boxToByte($anonfun$new$34(i, BoxesRunTime.unboxToByte(obj)));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Byte()));
        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", bArr2, convertToEqualizer.$eq$eq$eq(bArr2, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 172));
    }

    public static final /* synthetic */ void $anonfun$new$26(InputStreamsSuite inputStreamsSuite, File file, Duration duration, File file2, StreamingContext streamingContext) {
        ManualClock clock = streamingContext.scheduler().clock();
        clock.setTime(file.lastModified() + duration.milliseconds());
        BatchCounter batchCounter = new BatchCounter(streamingContext);
        DStream binaryRecordsStream = streamingContext.binaryRecordsStream(file2.toString(), 1);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        new TestOutputStream(binaryRecordsStream, concurrentLinkedQueue, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))).register();
        streamingContext.start();
        clock.advance(duration.milliseconds() / 2);
        byte[] bArr = (byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{1, 2, 3, 4, 5}), ClassTag$.MODULE$.Byte());
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach(obj -> {
            return $anonfun$new$27(inputStreamsSuite, duration, file2, bArr, clock, batchCounter, BoxesRunTime.unboxToInt(obj));
        });
        Seq seq = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).map(seq2 -> {
            return seq2.flatten(bArr2 -> {
                return new ArrayOps.ofByte($anonfun$new$32(bArr2));
            });
        }, Iterable$.MODULE$.canBuildFrom())).toSeq();
        seq.indices().foreach(obj2 -> {
            return $anonfun$new$33(inputStreamsSuite, seq, bArr, BoxesRunTime.unboxToInt(obj2));
        });
    }

    public static final /* synthetic */ void $anonfun$new$24(InputStreamsSuite inputStreamsSuite, File file) {
        Bool simpleMacroBool;
        Duration apply = Seconds$.MODULE$.apply(2L);
        File file2 = new File(file, "0");
        Files.write("0\n", file2, StandardCharsets.UTF_8);
        Bool simpleMacroBool2 = Bool$.MODULE$.simpleMacroBool(file2.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if (simpleMacroBool2.value()) {
            TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToLong(file2.lastModified()));
            simpleMacroBool = Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(10000), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(10000), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            simpleMacroBool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool bool = simpleMacroBool;
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(simpleMacroBool2, "&&", bool, simpleMacroBool2.$amp$amp(() -> {
            return bool;
        }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 136));
        inputStreamsSuite.withStreamingContext(new StreamingContext(inputStreamsSuite.conf(), apply), streamingContext -> {
            $anonfun$new$26(inputStreamsSuite, file2, apply, file, streamingContext);
            return BoxedUnit.UNIT;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void createFileAndAdvanceTime$1(int i, File file, File file2, ManualClock manualClock, Duration duration, BatchCounter batchCounter) {
        File file3 = new File(file2, Integer.toString(i));
        Files.write(new StringBuilder(1).append(i).append("\n").toString(), file3, StandardCharsets.UTF_8);
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(file3.setLastModified(manualClock.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 219));
        TripleEqualsSupport.Equalizer convertToEqualizer = convertToEqualizer(BoxesRunTime.boxToLong(file3.lastModified()));
        long timeMillis = manualClock.getTimeMillis();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToLong(timeMillis), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToLong(timeMillis), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 220));
        logInfo(() -> {
            return new StringBuilder(13).append("Created file ").append(file3).toString();
        });
        manualClock.advance(duration.milliseconds());
        Eventually$.MODULE$.eventually(eventuallyTimeout(), () -> {
            TripleEqualsSupport.Equalizer convertToEqualizer2 = this.convertToEqualizer(BoxesRunTime.boxToInteger(batchCounter.getNumCompletedBatches()));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(i), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(i), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 226));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 225));
    }

    public static final /* synthetic */ void $anonfun$new$38(InputStreamsSuite inputStreamsSuite, File file) {
        Bool simpleMacroBool;
        Duration apply = Seconds$.MODULE$.apply(2L);
        File createDirectory = Utils$.MODULE$.createDirectory(file.toString(), "tmp1");
        File createDirectory2 = Utils$.MODULE$.createDirectory(file.toString(), "tmp2");
        File file2 = new File(file, "0");
        Files.write("0\n", file2, StandardCharsets.UTF_8);
        Bool simpleMacroBool2 = Bool$.MODULE$.simpleMacroBool(file2.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if (simpleMacroBool2.value()) {
            TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToLong(file2.lastModified()));
            simpleMacroBool = Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(10000), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(10000), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            simpleMacroBool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool bool = simpleMacroBool;
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(simpleMacroBool2, "&&", bool, simpleMacroBool2.$amp$amp(() -> {
            return bool;
        }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 195));
        String sb = new StringBuilder(3).append(file.toString()).append("/*/").toString();
        inputStreamsSuite.withStreamingContext(new StreamingContext(inputStreamsSuite.conf(), apply), streamingContext -> {
            ManualClock clock = streamingContext.scheduler().clock();
            clock.setTime(file2.lastModified() + apply.milliseconds());
            BatchCounter batchCounter = new BatchCounter(streamingContext);
            DStream map = streamingContext.fileStream(sb, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(TextInputFormat.class)).map(tuple2 -> {
                return ((Text) tuple2._2()).toString();
            }, ClassTag$.MODULE$.apply(String.class));
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            new TestOutputStream(map, concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
            streamingContext.start();
            clock.advance(apply.milliseconds() / 2);
            Seq apply2 = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
            apply2.foreach(i -> {
                inputStreamsSuite.createFileAndAdvanceTime$1(i, createDirectory, createDirectory, clock, apply, batchCounter);
            });
            Seq apply3 = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{6, 7, 8, 9, 10}));
            apply3.foreach(i2 -> {
                inputStreamsSuite.createFileAndAdvanceTime$1(i2, createDirectory2, createDirectory, clock, apply, batchCounter);
            });
            Set set = ((TraversableOnce) ((TraversableLike) apply2.$plus$plus(apply3, Seq$.MODULE$.canBuildFrom())).map(obj -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj));
            }, Seq$.MODULE$.canBuildFrom())).toSet();
            TripleEqualsSupport.Equalizer convertToEqualizer2 = inputStreamsSuite.convertToEqualizer(((GenericTraversableTemplate) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms()).toSet());
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", set, convertToEqualizer2.$eq$eq$eq(set, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 239));
        });
    }

    private static final void write$1(Path path, String str, FileSystem fileSystem) {
        FSDataOutputStream create = fileSystem.create(path, true);
        IOUtils.write(str, create, StandardCharsets.UTF_8);
        create.close();
    }

    public static final /* synthetic */ void $anonfun$new$48(InputStreamsSuite inputStreamsSuite, File file) {
        Duration apply = Seconds$.MODULE$.apply(2L);
        long milliseconds = apply.milliseconds();
        Path path = new Path(file.toURI());
        Path path2 = new Path(path, "streaming");
        Path path3 = new Path(path2, "sub*");
        Path path4 = new Path(new Path(path, "generated"), "subdir");
        Path path5 = new Path(path2, "subdir");
        inputStreamsSuite.withStreamingContext(new StreamingContext(inputStreamsSuite.conf(), apply), streamingContext -> {
            FileSystem fileSystem = FileSystem.get(path.toUri(), streamingContext.sparkContext().hadoopConfiguration());
            fileSystem.delete(path, true);
            fileSystem.mkdirs(path);
            fileSystem.mkdirs(path2);
            fileSystem.mkdirs(path4);
            ManualClock clock = streamingContext.scheduler().clock();
            Path path6 = new Path(path4, "existing");
            write$1(path6, "existing\n", fileSystem);
            clock.setTime(fileSystem.getFileStatus(path6).getModificationTime() + milliseconds);
            BatchCounter batchCounter = new BatchCounter(streamingContext);
            DStream textFileStream = streamingContext.textFileStream(path3.toUri().toString());
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            new TestOutputStream(textFileStream, concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
            streamingContext.start();
            clock.advance(milliseconds);
            Eventually$.MODULE$.eventually(inputStreamsSuite.eventuallyTimeout(), () -> {
                TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToInteger(1));
                int numCompletedBatches = batchCounter.getNumCompletedBatches();
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(numCompletedBatches), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(numCompletedBatches), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 285));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 284));
            Path path7 = new Path(path4, "renamed.txt");
            write$1(path7, "renamed\n", fileSystem);
            long timeMillis = clock.getTimeMillis();
            fileSystem.setTimes(path7, timeMillis, timeMillis);
            long modificationTime = fileSystem.getFileStatus(path6).getModificationTime();
            long j = timeMillis + milliseconds;
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(modificationTime), "<", BoxesRunTime.boxToLong(j), modificationTime < j, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 294));
            fileSystem.rename(path4, path5);
            clock.advance(milliseconds);
            Eventually$.MODULE$.eventually(inputStreamsSuite.eventuallyTimeout(), () -> {
                TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToInteger(2));
                int numCompletedBatches = batchCounter.getNumCompletedBatches();
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(numCompletedBatches), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(numCompletedBatches), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 303));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 302));
            TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"renamed"})));
            Set set = ((GenericTraversableTemplate) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms()).toSet();
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", set, convertToEqualizer.$eq$eq$eq(set, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 307));
        });
    }

    private static final Iterable output$2(ConcurrentLinkedQueue concurrentLinkedQueue) {
        return ((GenericTraversableTemplate) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms());
    }

    public static final /* synthetic */ void $anonfun$new$53(InputStreamsSuite inputStreamsSuite, MultiThreadTestReceiver multiThreadTestReceiver, ConcurrentLinkedQueue concurrentLinkedQueue, int i, StreamingContext streamingContext) {
        new TestOutputStream(streamingContext.receiverStream(multiThreadTestReceiver, ClassTag$.MODULE$.Int()).count(), concurrentLinkedQueue, ClassTag$.MODULE$.Long()).register();
        streamingContext.start();
        ManualClock clock = streamingContext.scheduler().clock();
        long nanoTime = System.nanoTime();
        while (true) {
            if ((!MultiThreadTestReceiver$.MODULE$.haveAllThreadsFinished() || BoxesRunTime.unboxToLong(output$2(concurrentLinkedQueue).sum(Numeric$LongIsIntegral$.MODULE$)) < i) && System.nanoTime() - nanoTime < TimeUnit.SECONDS.toNanos(5L)) {
                Thread.sleep(100L);
                clock.advance(inputStreamsSuite.batchDuration().milliseconds());
            }
        }
        Thread.sleep(1000L);
    }

    public static final /* synthetic */ void $anonfun$new$57(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    private static final Iterable output$3(ConcurrentLinkedQueue concurrentLinkedQueue) {
        return (Iterable) ((TraversableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).filter(seq -> {
            return BoxesRunTime.boxToBoolean(seq.nonEmpty());
        });
    }

    public static final /* synthetic */ void $anonfun$new$63(InputStreamsSuite inputStreamsSuite, ConcurrentLinkedQueue concurrentLinkedQueue, Seq seq, StreamingContext streamingContext) {
        Queue queue = new Queue();
        new TestOutputStream(streamingContext.queueStream(queue, true, ClassTag$.MODULE$.apply(String.class)), concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
        streamingContext.start();
        ManualClock clock = streamingContext.scheduler().clock();
        Iterator it = seq.iterator();
        seq.indices().foreach$mVc$sp(i -> {
            it.take(2).foreach(str -> {
                Queue $plus$eq;
                synchronized (queue) {
                    SparkContext sparkContext = streamingContext.sparkContext();
                    $plus$eq = queue.$plus$eq(sparkContext.makeRDD(new $colon.colon(str, Nil$.MODULE$), sparkContext.makeRDD$default$2(), ClassTag$.MODULE$.apply(String.class)));
                }
                return $plus$eq;
            });
            clock.advance(inputStreamsSuite.batchDuration().milliseconds());
        });
        Thread.sleep(1000L);
    }

    public static final /* synthetic */ void $anonfun$new$69(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    public static final /* synthetic */ void $anonfun$new$73(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    private static final Iterable output$4(ConcurrentLinkedQueue concurrentLinkedQueue) {
        return (Iterable) ((TraversableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).filter(seq -> {
            return BoxesRunTime.boxToBoolean(seq.nonEmpty());
        });
    }

    public static final /* synthetic */ void $anonfun$new$79(InputStreamsSuite inputStreamsSuite, ConcurrentLinkedQueue concurrentLinkedQueue, Seq seq, StreamingContext streamingContext) {
        Queue queue = new Queue();
        new TestOutputStream(streamingContext.queueStream(queue, false, ClassTag$.MODULE$.apply(String.class)), concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
        streamingContext.start();
        ManualClock clock = streamingContext.scheduler().clock();
        Iterator it = seq.iterator();
        it.take(3).foreach(str -> {
            Queue $plus$eq;
            synchronized (queue) {
                SparkContext sparkContext = streamingContext.sparkContext();
                $plus$eq = queue.$plus$eq(sparkContext.makeRDD(new $colon.colon(str, Nil$.MODULE$), sparkContext.makeRDD$default$2(), ClassTag$.MODULE$.apply(String.class)));
            }
            return $plus$eq;
        });
        clock.advance(inputStreamsSuite.batchDuration().milliseconds());
        Thread.sleep(1000L);
        it.foreach(str2 -> {
            Queue $plus$eq;
            synchronized (queue) {
                SparkContext sparkContext = streamingContext.sparkContext();
                $plus$eq = queue.$plus$eq(sparkContext.makeRDD(new $colon.colon(str2, Nil$.MODULE$), sparkContext.makeRDD$default$2(), ClassTag$.MODULE$.apply(String.class)));
            }
            return $plus$eq;
        });
        clock.advance(inputStreamsSuite.batchDuration().milliseconds());
        Thread.sleep(1000L);
    }

    public static final /* synthetic */ void $anonfun$new$85(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    public static final /* synthetic */ void $anonfun$new$89(InputStreamsSuite inputStreamsSuite, Seq seq) {
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(2).append("[").append(seq.mkString(",")).append("]").toString();
        });
    }

    public static final /* synthetic */ boolean $anonfun$testFileStream$4(Path path) {
        return true;
    }

    public static final /* synthetic */ Assertion $anonfun$testFileStream$6(InputStreamsSuite inputStreamsSuite, File file, ManualClock manualClock, Duration duration, BatchCounter batchCounter, int i) {
        File file2 = new File(file, Integer.toString(i));
        Files.write(new StringBuilder(1).append(i).append("\n").toString(), file2, StandardCharsets.UTF_8);
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(file2.setLastModified(manualClock.getTimeMillis()), "file.setLastModified(clock.getTimeMillis())", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 506));
        TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToLong(file2.lastModified()));
        long timeMillis = manualClock.getTimeMillis();
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToLong(timeMillis), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToLong(timeMillis), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 507));
        inputStreamsSuite.logInfo(() -> {
            return new StringBuilder(13).append("Created file ").append(file2).toString();
        });
        manualClock.advance(duration.milliseconds());
        return (Assertion) Eventually$.MODULE$.eventually(inputStreamsSuite.eventuallyTimeout(), () -> {
            TripleEqualsSupport.Equalizer convertToEqualizer2 = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToInteger(batchCounter.getNumCompletedBatches()));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(i), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(i), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 513));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 512));
    }

    public static final /* synthetic */ void $anonfun$testFileStream$1(InputStreamsSuite inputStreamsSuite, boolean z, File file) {
        Bool simpleMacroBool;
        Duration apply = Seconds$.MODULE$.apply(2L);
        File file2 = new File(file, "0");
        Files.write("0\n", file2, StandardCharsets.UTF_8);
        Bool simpleMacroBool2 = Bool$.MODULE$.simpleMacroBool(file2.setLastModified(10000L), "existingFile.setLastModified(10000L)", Prettifier$.MODULE$.default());
        if (simpleMacroBool2.value()) {
            TripleEqualsSupport.Equalizer convertToEqualizer = inputStreamsSuite.convertToEqualizer(BoxesRunTime.boxToLong(file2.lastModified()));
            simpleMacroBool = Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(10000), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(10000), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        } else {
            simpleMacroBool = Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
        }
        Bool bool = simpleMacroBool;
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(simpleMacroBool2, "&&", bool, simpleMacroBool2.$amp$amp(() -> {
            return bool;
        }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 482));
        inputStreamsSuite.withStreamingContext(new StreamingContext(inputStreamsSuite.conf(), apply), streamingContext -> {
            ManualClock clock = streamingContext.scheduler().clock();
            clock.setTime(file2.lastModified() + apply.milliseconds());
            BatchCounter batchCounter = new BatchCounter(streamingContext);
            DStream map = streamingContext.fileStream(file.toString(), path -> {
                return BoxesRunTime.boxToBoolean($anonfun$testFileStream$4(path));
            }, z, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(TextInputFormat.class)).map(tuple2 -> {
                return ((Text) tuple2._2()).toString();
            }, ClassTag$.MODULE$.apply(String.class));
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            new TestOutputStream(map, concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
            streamingContext.start();
            clock.advance(apply.milliseconds() / 2);
            Seq apply2 = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
            apply2.foreach(obj -> {
                return $anonfun$testFileStream$6(inputStreamsSuite, file, clock, apply, batchCounter, BoxesRunTime.unboxToInt(obj));
            });
            Set set = z ? ((TraversableOnce) apply2.map(obj2 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj2));
            }, Seq$.MODULE$.canBuildFrom())).toSet() : ((TraversableOnce) ((TraversableLike) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0})).$plus$plus(apply2, Seq$.MODULE$.canBuildFrom())).map(obj3 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj3));
            }, Seq$.MODULE$.canBuildFrom())).toSet();
            TripleEqualsSupport.Equalizer convertToEqualizer2 = inputStreamsSuite.convertToEqualizer(((GenericTraversableTemplate) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms()).toSet());
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", set, convertToEqualizer2.$eq$eq$eq(set, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 523));
        });
    }

    public InputStreamsSuite() {
        TestSuiteBase.$init$(this);
        BeforeAndAfter.$init$(this);
        test("socket input stream", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            return (Assertion) this.withTestServer(new TestServer(TestServer$.MODULE$.$lessinit$greater$default$1()), testServer -> {
                testServer.start();
                return (Assertion) this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                    Seq apply = Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4, 5}));
                    new BatchCounter(streamingContext);
                    ReceiverInputDStream socketTextStream = streamingContext.socketTextStream("localhost", testServer.port(), StorageLevel$.MODULE$.MEMORY_AND_DISK());
                    ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
                    new TestOutputStream(socketTextStream, concurrentLinkedQueue, ClassTag$.MODULE$.apply(String.class)).register();
                    streamingContext.start();
                    ManualClock clock = streamingContext.scheduler().clock();
                    Seq seq = (Seq) apply.map(obj -> {
                        return Integer.toString(BoxesRunTime.unboxToInt(obj));
                    }, Seq$.MODULE$.canBuildFrom());
                    apply.indices().foreach$mVc$sp(i -> {
                        testServer.send(new StringBuilder(1).append(apply.apply(i).toString()).append("\n").toString());
                        clock.advance(this.batchDuration().milliseconds());
                    });
                    Eventually$.MODULE$.eventually(this.eventuallyTimeout(), () -> {
                        clock.advance(this.batchDuration().milliseconds());
                        this.logInfo(() -> {
                            return "--------------------------------";
                        });
                        this.logInfo(() -> {
                            return new StringBuilder(14).append("output.size = ").append(concurrentLinkedQueue.size()).toString();
                        });
                        this.logInfo(() -> {
                            return "output";
                        });
                        ((IterableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).foreach(seq2 -> {
                            $anonfun$new$10(this, seq2);
                            return BoxedUnit.UNIT;
                        });
                        this.logInfo(() -> {
                            return new StringBuilder(23).append("expected output.size = ").append(seq.size()).toString();
                        });
                        this.logInfo(() -> {
                            return "expected output";
                        });
                        seq.foreach(str -> {
                            $anonfun$new$14(this, str);
                            return BoxedUnit.UNIT;
                        });
                        this.logInfo(() -> {
                            return "--------------------------------";
                        });
                        String[] strArr = (String[]) ((GenericTraversableTemplate) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).flatten(Predef$.MODULE$.$conforms()).toArray(ClassTag$.MODULE$.apply(String.class));
                        TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToInteger(strArr.length));
                        int size = seq.size();
                        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(size), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(size), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 88));
                        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).indices().foreach(obj2 -> {
                            return $anonfun$new$17(this, strArr, seq, BoxesRunTime.unboxToInt(obj2));
                        });
                    }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 73));
                    return (Assertion) Eventually$.MODULE$.eventually(this.eventuallyTimeout(), () -> {
                        TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToLong(streamingContext.progressListener().numTotalReceivedRecords()));
                        int length = apply.length();
                        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(length), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(length), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 95));
                        TripleEqualsSupport.Equalizer convertToEqualizer2 = this.convertToEqualizer(BoxesRunTime.boxToLong(streamingContext.progressListener().numTotalProcessedRecords()));
                        int length2 = apply.length();
                        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(length2), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(length2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 96));
                    }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 94));
                });
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 48));
        test("socket input stream - no block in a batch", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.withTestServer(new TestServer(TestServer$.MODULE$.$lessinit$greater$default$1()), testServer -> {
                $anonfun$new$20(this, testServer);
                return BoxedUnit.UNIT;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 102));
        test("binary records stream", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.withTempDir(file -> {
                $anonfun$new$24(this, file);
                return BoxedUnit.UNIT;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 130));
        test("file input stream - newFilesOnly = true", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.testFileStream(true);
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 178));
        test("file input stream - newFilesOnly = false", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.testFileStream(false);
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 182));
        test("file input stream - wildcard", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.withTempDir(file -> {
                $anonfun$new$38(this, file);
                return BoxedUnit.UNIT;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 186));
        test("Modified files are correctly detected.", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.withTempDir(file -> {
                $anonfun$new$48(this, file);
                return BoxedUnit.UNIT;
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 244));
        test("multi-thread receiver", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            int i = 10 * 1000;
            MultiThreadTestReceiver multiThreadTestReceiver = new MultiThreadTestReceiver(10, 1000);
            MultiThreadTestReceiver$.MODULE$.haveAllThreadsFinished_$eq(false);
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                $anonfun$new$53(this, multiThreadTestReceiver, concurrentLinkedQueue, i, streamingContext);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            this.logInfo(() -> {
                return new StringBuilder(14).append("output.size = ").append(concurrentLinkedQueue.size()).toString();
            });
            this.logInfo(() -> {
                return "output";
            });
            ((IterableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).foreach(seq -> {
                $anonfun$new$57(this, seq);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(output$2(concurrentLinkedQueue).sum(Numeric$LongIsIntegral$.MODULE$));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(i), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(i), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 348));
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 312));
        test("queue input stream - oneAtATime = true", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            Seq colonVar = new $colon.colon("1", new $colon.colon("2", new $colon.colon("3", new $colon.colon("4", new $colon.colon("5", Nil$.MODULE$)))));
            Seq seq = (Seq) colonVar.map(str -> {
                return new $colon.colon(str, Nil$.MODULE$);
            }, Seq$.MODULE$.canBuildFrom());
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                $anonfun$new$63(this, concurrentLinkedQueue, colonVar, streamingContext);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            this.logInfo(() -> {
                return new StringBuilder(14).append("output.size = ").append(concurrentLinkedQueue.size()).toString();
            });
            this.logInfo(() -> {
                return "output";
            });
            ((IterableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).foreach(seq2 -> {
                $anonfun$new$69(this, seq2);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return new StringBuilder(23).append("expected output.size = ").append(seq.size()).toString();
            });
            this.logInfo(() -> {
                return "expected output";
            });
            seq.foreach(seq3 -> {
                $anonfun$new$73(this, seq3);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToInteger(output$3(concurrentLinkedQueue).size()));
            int size = seq.size();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(size), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(size), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 392));
            ((IterableLike) output$3(concurrentLinkedQueue).zipWithIndex(Iterable$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Seq seq4 = (Seq) tuple2._1();
                Seq seq5 = (Seq) seq.apply(tuple2._2$mcI$sp());
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(seq4, "==", seq5, seq4 != null ? seq4.equals(seq5) : seq5 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 393));
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 351));
        test("queue input stream - oneAtATime = false", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            Seq colonVar = new $colon.colon("1", new $colon.colon("2", new $colon.colon("3", new $colon.colon("4", new $colon.colon("5", Nil$.MODULE$)))));
            Seq colonVar2 = new $colon.colon(new $colon.colon("1", new $colon.colon("2", new $colon.colon("3", Nil$.MODULE$))), new $colon.colon(new $colon.colon("4", new $colon.colon("5", Nil$.MODULE$)), Nil$.MODULE$));
            this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                $anonfun$new$79(this, concurrentLinkedQueue, colonVar, streamingContext);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            this.logInfo(() -> {
                return new StringBuilder(14).append("output.size = ").append(concurrentLinkedQueue.size()).toString();
            });
            this.logInfo(() -> {
                return "output";
            });
            ((IterableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(concurrentLinkedQueue).asScala()).foreach(seq -> {
                $anonfun$new$85(this, seq);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return new StringBuilder(23).append("expected output.size = ").append(colonVar2.size()).toString();
            });
            this.logInfo(() -> {
                return "expected output";
            });
            colonVar2.foreach(seq2 -> {
                $anonfun$new$89(this, seq2);
                return BoxedUnit.UNIT;
            });
            this.logInfo(() -> {
                return "--------------------------------";
            });
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToInteger(output$4(concurrentLinkedQueue).size()));
            int size = colonVar2.size();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(size), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(size), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 444));
            ((IterableLike) output$4(concurrentLinkedQueue).zipWithIndex(Iterable$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Seq seq3 = (Seq) tuple2._1();
                Seq seq4 = (Seq) colonVar2.apply(tuple2._2$mcI$sp());
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(seq3, "==", seq4, seq3 != null ? seq3.equals(seq4) : seq4 == null, Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 445));
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 396));
        test("test track the number of input stream", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            return (Assertion) this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), streamingContext -> {
                final InputStreamsSuite inputStreamsSuite = null;
                final InputStreamsSuite inputStreamsSuite2 = null;
                InputStreamsSuite$TestReceiverInputDStream$1[] inputStreamsSuite$TestReceiverInputDStream$1Arr = (InputStreamsSuite$TestReceiverInputDStream$1[]) new InputStreamsSuite$TestReceiverInputDStream$1[]{new ReceiverInputDStream<String>(inputStreamsSuite, streamingContext) { // from class: org.apache.spark.streaming.InputStreamsSuite$TestReceiverInputDStream$1
                    public Receiver<String> getReceiver() {
                        return null;
                    }

                    {
                        ClassTag apply = ClassTag$.MODULE$.apply(String.class);
                    }
                }, new ReceiverInputDStream<String>(inputStreamsSuite2, streamingContext) { // from class: org.apache.spark.streaming.InputStreamsSuite$TestReceiverInputDStream$1
                    public Receiver<String> getReceiver() {
                        return null;
                    }

                    {
                        ClassTag apply = ClassTag$.MODULE$.apply(String.class);
                    }
                }};
                final InputStreamsSuite inputStreamsSuite3 = null;
                final InputStreamsSuite inputStreamsSuite4 = null;
                final InputStreamsSuite inputStreamsSuite5 = null;
                InputStreamsSuite$TestInputDStream$1[] inputStreamsSuite$TestInputDStream$1Arr = (InputStreamsSuite$TestInputDStream$1[]) new InputStreamsSuite$TestInputDStream$1[]{new InputDStream<String>(inputStreamsSuite3, streamingContext) { // from class: org.apache.spark.streaming.InputStreamsSuite$TestInputDStream$1
                    public void start() {
                    }

                    public void stop() {
                    }

                    public Option<RDD<String>> compute(Time time) {
                        return None$.MODULE$;
                    }

                    {
                        ClassTag apply = ClassTag$.MODULE$.apply(String.class);
                    }
                }, new InputDStream<String>(inputStreamsSuite4, streamingContext) { // from class: org.apache.spark.streaming.InputStreamsSuite$TestInputDStream$1
                    public void start() {
                    }

                    public void stop() {
                    }

                    public Option<RDD<String>> compute(Time time) {
                        return None$.MODULE$;
                    }

                    {
                        ClassTag apply = ClassTag$.MODULE$.apply(String.class);
                    }
                }, new InputDStream<String>(inputStreamsSuite5, streamingContext) { // from class: org.apache.spark.streaming.InputStreamsSuite$TestInputDStream$1
                    public void start() {
                    }

                    public void stop() {
                    }

                    public Option<RDD<String>> compute(Time time) {
                        return None$.MODULE$;
                    }

                    {
                        ClassTag apply = ClassTag$.MODULE$.apply(String.class);
                    }
                }};
                InputDStream[] inputStreams = streamingContext.graph().getInputStreams();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(inputStreams, "length", BoxesRunTime.boxToInteger(inputStreams.length), BoxesRunTime.boxToInteger(inputStreamsSuite$TestReceiverInputDStream$1Arr.length + inputStreamsSuite$TestInputDStream$1Arr.length), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 467));
                ReceiverInputDStream[] receiverInputStreams = streamingContext.graph().getReceiverInputStreams();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(receiverInputStreams, "length", BoxesRunTime.boxToInteger(receiverInputStreams.length), BoxesRunTime.boxToInteger(inputStreamsSuite$TestReceiverInputDStream$1Arr.length), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 469));
                TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(streamingContext.graph().getReceiverInputStreams());
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", inputStreamsSuite$TestReceiverInputDStream$1Arr, convertToEqualizer.$eq$eq$eq(inputStreamsSuite$TestReceiverInputDStream$1Arr, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 470));
                TripleEqualsSupport.Equalizer convertToEqualizer2 = this.convertToEqualizer(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(streamingContext.graph().getInputStreams())).map(inputDStream -> {
                    return BoxesRunTime.boxToInteger(inputDStream.id());
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int())));
                int[] iArr = (int[]) Array$.MODULE$.tabulate(5, i -> {
                    return i;
                }, ClassTag$.MODULE$.Int());
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", iArr, convertToEqualizer2.$eq$eq$eq(iArr, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 471));
                TripleEqualsSupport.Equalizer convertToEqualizer3 = this.convertToEqualizer(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(inputStreamsSuite$TestReceiverInputDStream$1Arr)).map(inputStreamsSuite$TestReceiverInputDStream$1 -> {
                    return BoxesRunTime.boxToInteger(inputStreamsSuite$TestReceiverInputDStream$1.id());
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int())));
                int[] iArr2 = {0, 1};
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer3, "===", iArr2, convertToEqualizer3.$eq$eq$eq(iArr2, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 472));
            });
        }, new Position("InputStreamsSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 448));
    }
}
