/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka09;

import java.io.File;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.Time$;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.DStream$;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka09.CanCommitOffsets;
import org.apache.spark.streaming.kafka09.ConstantEstimator;
import org.apache.spark.streaming.kafka09.ConstantRateController;
import org.apache.spark.streaming.kafka09.ConsumerStrategies$;
import org.apache.spark.streaming.kafka09.DefaultPerPartitionConfig;
import org.apache.spark.streaming.kafka09.DirectKafkaInputDStream;
import org.apache.spark.streaming.kafka09.DirectKafkaStreamSuite$;
import org.apache.spark.streaming.kafka09.HasOffsetRanges;
import org.apache.spark.streaming.kafka09.KafkaTestUtils;
import org.apache.spark.streaming.kafka09.KafkaUtils$;
import org.apache.spark.streaming.kafka09.LocationStrategies$;
import org.apache.spark.streaming.kafka09.LocationStrategy;
import org.apache.spark.streaming.kafka09.OffsetRange;
import org.apache.spark.streaming.kafka09.PerPartitionConfig;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.AbstractPatienceConfiguration;
import org.scalatest.concurrent.AbstractPatienceConfiguration$PatienceConfig$;
import org.scalatest.concurrent.Eventually;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.ScaledTimeSpans;
import org.scalatest.time.Span;
import org.scalatest.time.Span$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.math.Numeric;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0001\tuc\u0001B\u0013'\u0001EBQa\u0013\u0001\u0005\u00021Cqa\u0014\u0001C\u0002\u0013\u0005\u0001\u000b\u0003\u0004U\u0001\u0001\u0006I!\u0015\u0005\n+\u0002\u0001\r\u00111A\u0005\nYC\u0011b\u0017\u0001A\u0002\u0003\u0007I\u0011\u0002/\t\u0013\u0015\u0004\u0001\u0019!A!B\u00139\u0006\"\u00034\u0001\u0001\u0004\u0005\r\u0011\"\u0003h\u0011%\u0001\b\u00011AA\u0002\u0013%\u0011\u000fC\u0005t\u0001\u0001\u0007\t\u0011)Q\u0005Q\"IA\u000f\u0001a\u0001\u0002\u0004%I!\u001e\u0005\ns\u0002\u0001\r\u00111A\u0005\niD\u0011\u0002 \u0001A\u0002\u0003\u0005\u000b\u0015\u0002<\t\u000bu\u0004A\u0011\t@\t\u000b}\u0004A\u0011\t@\t\u000f\u0005\u0005\u0001\u0001\"\u0001\u0002\u0004!I\u00111\t\u0001C\u0002\u0013\u0005\u0011Q\t\u0005\t\u0003\u001b\u0002\u0001\u0015!\u0003\u0002H!9\u0011q\n\u0001\u0005\n\u0005E\u0003bBA^\u0001\u0011%\u0011QX\u0004\b\u0003W4\u0003\u0012AAw\r\u0019)c\u0005#\u0001\u0002p\"11*\u0006C\u0001\u0003{D\u0011\"a@\u0016\u0005\u0004%\tA!\u0001\t\u0011\tEQ\u0003)A\u0005\u0005\u00071aAa\u0005\u0016\u0001\tU\u0001BB&\u001a\t\u0003\u0011i\u0002C\u0005\u0003$e\u0011\r\u0011\"\u0001\u0003\u0002!A!QE\r!\u0002\u0013\u0011\u0019\u0001C\u0005\u0003(e\u0011\r\u0011\"\u0001\u0003\u0002!A!\u0011F\r!\u0002\u0013\u0011\u0019\u0001C\u0005\u0003,e\u0011\r\u0011\"\u0001\u0003\u0002!A!QF\r!\u0002\u0013\u0011\u0019\u0001C\u0004\u00030e!\tE!\r\t\u000f\tu\u0012\u0004\"\u0011\u0003@!9!1J\r\u0005B\t5\u0003\"\u0003B-+\u0005\u0005I\u0011\u0002B.\u0005Y!\u0015N]3di.\u000bgm[1TiJ,\u0017-\\*vSR,'BA\u0014)\u0003\u001dY\u0017MZ6baeR!!\u000b\u0016\u0002\u0013M$(/Z1nS:<'BA\u0016-\u0003\u0015\u0019\b/\u0019:l\u0015\tic&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002_\u0005\u0019qN]4\u0004\u0001M1\u0001A\r\u001c=\u007f\u0015\u0003\"a\r\u001b\u000e\u0003)J!!\u000e\u0016\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\t9$(D\u00019\u0015\tId&A\u0005tG\u0006d\u0017\r^3ti&\u00111\b\u000f\u0002\u000f\u0005\u00164wN]3B]\u0012\fe\r^3s!\t9T(\u0003\u0002?q\t\t\")\u001a4pe\u0016\fe\u000eZ!gi\u0016\u0014\u0018\t\u001c7\u0011\u0005\u0001\u001bU\"A!\u000b\u0005\tC\u0014AC2p]\u000e,(O]3oi&\u0011A)\u0011\u0002\u000b\u000bZ,g\u000e^;bY2L\bC\u0001$J\u001b\u00059%B\u0001%+\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001&H\u0005\u001daunZ4j]\u001e\fa\u0001P5oSRtD#A'\u0011\u00059\u0003Q\"\u0001\u0014\u0002\u0013M\u0004\u0018M]6D_:4W#A)\u0011\u0005M\u0012\u0016BA*+\u0005%\u0019\u0006/\u0019:l\u0007>tg-\u0001\u0006ta\u0006\u00148nQ8oM\u0002\n1a]:d+\u00059\u0006C\u0001-Z\u001b\u0005A\u0013B\u0001.)\u0005A\u0019FO]3b[&twmQ8oi\u0016DH/A\u0004tg\u000e|F%Z9\u0015\u0005u\u001b\u0007C\u00010b\u001b\u0005y&\"\u00011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\t|&\u0001B+oSRDq\u0001Z\u0003\u0002\u0002\u0003\u0007q+A\u0002yIE\nAa]:dA\u00059A/Z:u\t&\u0014X#\u00015\u0011\u0005%tW\"\u00016\u000b\u0005-d\u0017AA5p\u0015\u0005i\u0017\u0001\u00026bm\u0006L!a\u001c6\u0003\t\u0019KG.Z\u0001\fi\u0016\u001cH\u000fR5s?\u0012*\u0017\u000f\u0006\u0002^e\"9A\rCA\u0001\u0002\u0004A\u0017\u0001\u0003;fgR$\u0015N\u001d\u0011\u0002\u001d-\fgm[1UKN$X\u000b^5mgV\ta\u000f\u0005\u0002Oo&\u0011\u0001P\n\u0002\u000f\u0017\u000647.\u0019+fgR,F/\u001b7t\u0003IY\u0017MZ6b)\u0016\u001cH/\u0016;jYN|F%Z9\u0015\u0005u[\bb\u00023\f\u0003\u0003\u0005\rA^\u0001\u0010W\u000647.\u0019+fgR,F/\u001b7tA\u0005I!-\u001a4pe\u0016\fE\u000e\u001c\u000b\u0002;\u0006A\u0011M\u001a;fe\u0006cG.\u0001\bhKR\\\u0015MZ6b!\u0006\u0014\u0018-\\:\u0015\t\u0005\u0015\u00111\u0007\t\t\u0003\u000f\ti!!\u0005\u0002(5\u0011\u0011\u0011\u0002\u0006\u0004\u0003\u0017a\u0017\u0001B;uS2LA!a\u0004\u0002\n\t9\u0001*Y:i\u001b\u0006\u0004\b\u0003BA\n\u0003CqA!!\u0006\u0002\u001eA\u0019\u0011qC0\u000e\u0005\u0005e!bAA\u000ea\u00051AH]8pizJ1!a\b`\u0003\u0019\u0001&/\u001a3fM&!\u00111EA\u0013\u0005\u0019\u0019FO]5oO*\u0019\u0011qD0\u0011\t\u0005%\u0012qF\u0007\u0003\u0003WQ1!!\fm\u0003\u0011a\u0017M\\4\n\t\u0005E\u00121\u0006\u0002\u0007\u001f\nTWm\u0019;\t\u000f\u0005Ur\u00021\u0001\u00028\u0005)Q\r\u001f;sCB)a,!\u000f\u0002>%\u0019\u00111H0\u0003\u0015q\u0012X\r]3bi\u0016$g\bE\u0004_\u0003\u007f\t\t\"a\n\n\u0007\u0005\u0005sL\u0001\u0004UkBdWMM\u0001\u000faJ,g-\u001a:sK\u0012Dun\u001d;t+\t\t9\u0005E\u0002O\u0003\u0013J1!a\u0013'\u0005AaunY1uS>t7\u000b\u001e:bi\u0016<\u00170A\bqe\u00164WM\u001d:fI\"{7\u000f^:!\u0003=9W\r^(gMN,GOU1oO\u0016\u001cXCBA*\u0003G\u000b9\f\u0006\u0003\u0002V\u0005m\u0004CBA,\u0003C\n9G\u0004\u0003\u0002Z\u0005uc\u0002BA\f\u00037J\u0011\u0001Y\u0005\u0004\u0003?z\u0016a\u00029bG.\fw-Z\u0005\u0005\u0003G\n)GA\u0002TKFT1!a\u0018`!\u001dq\u0016qHA5\u0003_\u00022\u0001WA6\u0013\r\ti\u0007\u000b\u0002\u0005)&lW\rE\u0003_\u0003c\n)(C\u0002\u0002t}\u0013Q!\u0011:sCf\u00042ATA<\u0013\r\tIH\n\u0002\f\u001f\u001a47/\u001a;SC:<W\rC\u0004\u0002~I\u0001\r!a \u0002\u0017-\fgm[1TiJ,\u0017-\u001c\t\u0007\u0003\u0003\u000b9)a#\u000e\u0005\u0005\r%bAACQ\u00059Am\u001d;sK\u0006l\u0017\u0002BAE\u0003\u0007\u0013q\u0001R*ue\u0016\fW\u000e\u0005\u0005\u0002\u000e\u0006m\u0015qTA[\u001b\t\tyI\u0003\u0003\u0002\u0012\u0006M\u0015\u0001C2p]N,X.\u001a:\u000b\t\u0005U\u0015qS\u0001\bG2LWM\u001c;t\u0015\r\tI\nL\u0001\u0006W\u000647.Y\u0005\u0005\u0003;\u000byI\u0001\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3\u0011\t\u0005\u0005\u00161\u0015\u0007\u0001\t\u001d\t)K\u0005b\u0001\u0003O\u0013\u0011aS\t\u0005\u0003S\u000by\u000bE\u0002_\u0003WK1!!,`\u0005\u001dqu\u000e\u001e5j]\u001e\u00042AXAY\u0013\r\t\u0019l\u0018\u0002\u0004\u0003:L\b\u0003BAQ\u0003o#q!!/\u0013\u0005\u0004\t9KA\u0001W\u0003Q9W\r\u001e#je\u0016\u001cGoS1gW\u0006\u001cFO]3b[RA\u0011qXAc\u0003\u0013\fy\u000eE\u0004O\u0003\u0003\f\t\"!\u0005\n\u0007\u0005\rgEA\fESJ,7\r^&bM.\f\u0017J\u001c9vi\u0012\u001bFO]3b[\"9\u0011qY\nA\u0002\u0005E\u0011!\u0002;pa&\u001c\u0007bBAf'\u0001\u0007\u0011QZ\u0001\u0013[>\u001c7NU1uK\u000e{g\u000e\u001e:pY2,'\u000fE\u0003_\u0003\u001f\f\u0019.C\u0002\u0002R~\u0013aa\u00149uS>t\u0007\u0003BAk\u00037l!!a6\u000b\u0007\u0005e\u0007&A\u0005tG\",G-\u001e7fe&!\u0011Q\\Al\u00059\u0011\u0016\r^3D_:$(o\u001c7mKJDq!!9\u0014\u0001\u0004\t\u0019/A\u0002qa\u000e\u0004RAXAh\u0003K\u00042ATAt\u0013\r\tIO\n\u0002\u0013!\u0016\u0014\b+\u0019:uSRLwN\\\"p]\u001aLw-\u0001\fESJ,7\r^&bM.\f7\u000b\u001e:fC6\u001cV/\u001b;f!\tqUcE\u0003\u0016\u0003c\f9\u0010E\u0002_\u0003gL1!!>`\u0005\u0019\te.\u001f*fMB\u0019a,!?\n\u0007\u0005mxL\u0001\u0007TKJL\u0017\r\\5{C\ndW\r\u0006\u0002\u0002n\u0006)Ao\u001c;bYV\u0011!1\u0001\t\u0005\u0005\u000b\u0011i!\u0004\u0002\u0003\b)!!\u0011\u0002B\u0006\u0003\u0019\tGo\\7jG*\u0019!)!\u0003\n\t\t=!q\u0001\u0002\u000b\u0003R|W.[2M_:<\u0017A\u0002;pi\u0006d\u0007E\u0001\nJ]B,H/\u00138g_\u000e{G\u000e\\3di>\u00148#B\r\u0002r\n]\u0001\u0003BAk\u00053IAAa\u0007\u0002X\n\t2\u000b\u001e:fC6Lgn\u001a'jgR,g.\u001a:\u0015\u0005\t}\u0001c\u0001B\u001135\tQ#A\nok6\u0014VmY8sIN\u001cVOY7jiR,G-\u0001\u000bok6\u0014VmY8sIN\u001cVOY7jiR,G\rI\u0001\u0012]Vl'+Z2pe\u0012\u001c8\u000b^1si\u0016$\u0017A\u00058v[J+7m\u001c:egN#\u0018M\u001d;fI\u0002\n1C\\;n%\u0016\u001cwN\u001d3t\u0007>l\u0007\u000f\\3uK\u0012\fAC\\;n%\u0016\u001cwN\u001d3t\u0007>l\u0007\u000f\\3uK\u0012\u0004\u0013\u0001E8o\u0005\u0006$8\r[*vE6LG\u000f^3e)\ri&1\u0007\u0005\b\u0005k\t\u0003\u0019\u0001B\u001c\u00039\u0011\u0017\r^2i'V\u0014W.\u001b;uK\u0012\u0004B!!6\u0003:%!!1HAl\u0005}\u0019FO]3b[&tw\rT5ti\u0016tWM\u001d\"bi\u000eD7+\u001e2nSR$X\rZ\u0001\u000f_:\u0014\u0015\r^2i'R\f'\u000f^3e)\ri&\u0011\t\u0005\b\u0005\u0007\u0012\u0003\u0019\u0001B#\u00031\u0011\u0017\r^2i'R\f'\u000f^3e!\u0011\t)Na\u0012\n\t\t%\u0013q\u001b\u0002\u001e'R\u0014X-Y7j]\u001ed\u0015n\u001d;f]\u0016\u0014()\u0019;dQN#\u0018M\u001d;fI\u0006\u0001rN\u001c\"bi\u000eD7i\\7qY\u0016$X\r\u001a\u000b\u0004;\n=\u0003b\u0002B)G\u0001\u0007!1K\u0001\u000fE\u0006$8\r[\"p[BdW\r^3e!\u0011\t)N!\u0016\n\t\t]\u0013q\u001b\u0002 'R\u0014X-Y7j]\u001ed\u0015n\u001d;f]\u0016\u0014()\u0019;dQ\u000e{W\u000e\u001d7fi\u0016$\u0017a\u0003:fC\u0012\u0014Vm]8mm\u0016$\"!a\n")
public class DirectKafkaStreamSuite
extends SparkFunSuite
implements BeforeAndAfter,
Eventually {
    private final SparkConf sparkConf;
    private StreamingContext org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc;
    private File testDir;
    private KafkaTestUtils kafkaTestUtils;
    private final LocationStrategy preferredHosts;
    private final AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    private volatile AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$module;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;

    public static AtomicLong total() {
        return DirectKafkaStreamSuite$.MODULE$.total();
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, PatienceConfiguration.Interval interval, Function0<T> fun, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, (PatienceConfiguration.Interval)interval, fun, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, fun, (AbstractPatienceConfiguration.PatienceConfig)config, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Interval interval, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Interval)interval, fun, (AbstractPatienceConfiguration.PatienceConfig)config, (Position)pos);
    }

    public <T> T eventually(Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, fun, (AbstractPatienceConfiguration.PatienceConfig)config, (Position)pos);
    }

    public AbstractPatienceConfiguration.PatienceConfig patienceConfig() {
        return PatienceConfiguration.patienceConfig$((PatienceConfiguration)this);
    }

    public PatienceConfiguration.Timeout timeout(Span value) {
        return PatienceConfiguration.timeout$((PatienceConfiguration)this, (Span)value);
    }

    public PatienceConfiguration.Interval interval(Span value) {
        return PatienceConfiguration.interval$((PatienceConfiguration)this, (Span)value);
    }

    public final Span scaled(Span span) {
        return ScaledTimeSpans.scaled$((ScaledTimeSpans)this, (Span)span);
    }

    public double spanScaleFactor() {
        return ScaledTimeSpans.spanScaleFactor$((ScaledTimeSpans)this);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String testName, Args args) {
        return BeforeAndAfterEach.runTest$((BeforeAndAfterEach)this, (String)testName, (Args)args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option testName, Args args) {
        return BeforeAndAfterAll.run$((BeforeAndAfterAll)this, (Option)testName, (Args)args);
    }

    public void before(Function0<Object> fun, Position pos) {
        BeforeAndAfter.before$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public void after(Function0<Object> fun, Position pos) {
        BeforeAndAfter.after$((BeforeAndAfter)this, fun, (Position)pos);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfter.runTest$((BeforeAndAfter)this, (String)testName, (Args)args);
    }

    public Status run(Option<String> testName, Args args) {
        return BeforeAndAfter.run$((BeforeAndAfter)this, testName, (Args)args);
    }

    public AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig() {
        return this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    }

    public final void org$scalatest$concurrent$PatienceConfiguration$_setter_$org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig_$eq(AbstractPatienceConfiguration.PatienceConfig x$1) {
        this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig = x$1;
    }

    public AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig() {
        if (this.PatienceConfig$module == null) {
            this.PatienceConfig$lzycompute$1();
        }
        return this.PatienceConfig$module;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean x$1) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = x$1;
    }

    public final void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference<Option<Function0<Object>>> x$1) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = x$1;
    }

    public SparkConf sparkConf() {
        return this.sparkConf;
    }

    public StreamingContext org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc() {
        return this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc;
    }

    private void org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(StreamingContext x$1) {
        this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc = x$1;
    }

    private File testDir() {
        return this.testDir;
    }

    private void testDir_$eq(File x$1) {
        this.testDir = x$1;
    }

    private KafkaTestUtils kafkaTestUtils() {
        return this.kafkaTestUtils;
    }

    private void kafkaTestUtils_$eq(KafkaTestUtils x$1) {
        this.kafkaTestUtils = x$1;
    }

    public void beforeAll() {
        this.kafkaTestUtils_$eq(new KafkaTestUtils());
        this.kafkaTestUtils().setup();
    }

    public void afterAll() {
        block0: {
            if (this.kafkaTestUtils() == null) break block0;
            this.kafkaTestUtils().teardown();
            this.kafkaTestUtils_$eq(null);
        }
    }

    /*
     * WARNING - void declaration
     */
    public HashMap<String, Object> getKafkaParams(Seq<Tuple2<String, Object>> extra) {
        void var2_2;
        HashMap<String, Object> kp = new HashMap<String, Object>();
        kp.put("bootstrap.servers", this.kafkaTestUtils().brokerAddress());
        kp.put("key.deserializer", StringDeserializer.class);
        kp.put("value.deserializer", StringDeserializer.class);
        kp.put("group.id", new StringBuilder(15).append("test-consumer-").append(Random$.MODULE$.nextInt()).append("-").append(System.currentTimeMillis()).toString());
        extra.foreach((Function1 & Serializable & scala.Serializable)e -> kp.put((String)e._1(), e._2()));
        return var2_2;
    }

    public LocationStrategy preferredHosts() {
        return this.preferredHosts;
    }

    private <K, V> Seq<Tuple2<Time, OffsetRange[]>> getOffsetRanges(DStream<ConsumerRecord<K, V>> kafkaStream) {
        return (Seq)kafkaStream.generatedRDDs().mapValues((Function1 & Serializable & scala.Serializable)rdd -> ((HasOffsetRanges)rdd).offsetRanges()).toSeq().sortBy((Function1 & Serializable & scala.Serializable)x$12 -> (Time)x$12._1(), Time$.MODULE$.ordering());
    }

    private DirectKafkaInputDStream<String, String> getDirectKafkaStream(String topic, Option<RateController> mockRateController, Option<PerPartitionConfig> ppc) {
        int batchIntervalMilliseconds = 100;
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
        this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
        HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
        HashMap<String, Object> ekp = new HashMap<String, Object>(kafkaParams);
        DirectKafkaInputDStream<String, String> s = new DirectKafkaInputDStream<String, String>(this, ekp, kafkaParams, topic, ppc, sparkConf, mockRateController){
            private final Option<RateController> rateController;

            public Option<RateController> rateController() {
                return this.rateController;
            }
            {
                this.rateController = mockRateController$1;
            }
        };
        s.start();
        return s;
    }

    private final void PatienceConfig$lzycompute$1() {
        DirectKafkaStreamSuite directKafkaStreamSuite = this;
        synchronized (directKafkaStreamSuite) {
            if (this.PatienceConfig$module == null) {
                this.PatienceConfig$module = new AbstractPatienceConfiguration$PatienceConfig$((AbstractPatienceConfiguration)this);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$new$3(DirectKafkaStreamSuite $this, scala.collection.immutable.Map data$1, String t) {
        $this.kafkaTestUtils().createTopic(t);
        $this.kafkaTestUtils().sendMessages(t, data$1);
    }

    public static final /* synthetic */ InputDStream $anonfun$new$4(DirectKafkaStreamSuite $this, List topics$1, HashMap kafkaParams$1, scala.collection.immutable.Map offsets$1) {
        return KafkaUtils$.MODULE$.createDirectStream($this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), $this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable)topics$1, (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams$1).asScala(), (Map)offsets$1));
    }

    public static final /* synthetic */ Iterator $anonfun$new$10(ObjectRef offsetRanges$1, int i, Iterator iter) {
        OffsetRange off = ((OffsetRange[])offsetRanges$1.elem)[i];
        Seq all = iter.toSeq();
        int partSize = all.size();
        long rangeSize = off.untilOffset() - off.fromOffset();
        return scala.package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2.mcIJ.sp(partSize, rangeSize)}));
    }

    public static final /* synthetic */ void $anonfun$new$7(DirectKafkaStreamSuite $this, ObjectRef offsetRanges$1, RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((OffsetRange[])offsetRanges$1.elem))).foreach((Function1 & Serializable & scala.Serializable)o -> {
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(3).append(o.topic()).append(" ").append(o.partition()).append(" ").append(o.fromOffset()).append(" ").append(o.untilOffset()).toString());
            return BoxedUnit.UNIT;
        });
        Tuple2[] collected = (Tuple2[])rdd.mapPartitionsWithIndex((Function2 & Serializable & scala.Serializable)(i, iter) -> DirectKafkaStreamSuite.$anonfun$new$10(offsetRanges$1, BoxesRunTime.unboxToInt((Object)i), iter), rdd.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])collected)).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int partSize = tuple2._1$mcI$sp();
            long rangeSize = tuple2._2$mcJ$sp();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)partSize));
            long $org_scalatest_assert_macro_right = rangeSize;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertion assertion = $this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"offset ranges are wrong", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 140));
            return assertion;
        });
    }

    public static final /* synthetic */ void $anonfun$new$16(DirectKafkaStreamSuite $this, scala.collection.immutable.Map data$2, String t) {
        $this.kafkaTestUtils().createTopic(t);
        $this.kafkaTestUtils().sendMessages(t, data$2);
    }

    public static final /* synthetic */ Iterator $anonfun$new$23(ObjectRef offsetRanges$2, int i, Iterator iter) {
        OffsetRange off = ((OffsetRange[])offsetRanges$2.elem)[i];
        Seq all = iter.toSeq();
        int partSize = all.size();
        long rangeSize = off.untilOffset() - off.fromOffset();
        return scala.package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2.mcIJ.sp(partSize, rangeSize)}));
    }

    public static final /* synthetic */ void $anonfun$new$20(DirectKafkaStreamSuite $this, ObjectRef offsetRanges$2, RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((OffsetRange[])offsetRanges$2.elem))).foreach((Function1 & Serializable & scala.Serializable)o -> {
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(3).append(o.topic()).append(" ").append(o.partition()).append(" ").append(o.fromOffset()).append(" ").append(o.untilOffset()).toString());
            return BoxedUnit.UNIT;
        });
        Tuple2[] collected = (Tuple2[])rdd.mapPartitionsWithIndex((Function2 & Serializable & scala.Serializable)(i, iter) -> DirectKafkaStreamSuite.$anonfun$new$23(offsetRanges$2, BoxesRunTime.unboxToInt((Object)i), iter), rdd.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])collected)).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int partSize = tuple2._1$mcI$sp();
            long rangeSize = tuple2._2$mcJ$sp();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)partSize));
            long $org_scalatest_assert_macro_right = rangeSize;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertion assertion = $this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"offset ranges are wrong", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 206));
            return assertion;
        });
    }

    private static final long getLatestOffset$1(KafkaConsumer kc$1, TopicPartition topicPartition$1) {
        kc$1.seekToEnd(Arrays.asList((Object[])new TopicPartition[]{topicPartition$1}));
        return kc$1.position(topicPartition$1);
    }

    private static final long getLatestOffset$2(KafkaConsumer kc$2, TopicPartition topicPartition$2) {
        kc$2.seekToEnd(Arrays.asList((Object[])new TopicPartition[]{topicPartition$2}));
        return kc$2.position(topicPartition$2);
    }

    public static final /* synthetic */ String $anonfun$new$41(int x$3) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$3)).toString();
    }

    private final void sendData$1(Seq data, String topic$2) {
        Seq strings = (Seq)data.map((Function1 & Serializable & scala.Serializable)x$3 -> DirectKafkaStreamSuite.$anonfun$new$41(BoxesRunTime.unboxToInt((Object)x$3)), Seq$.MODULE$.canBuildFrom());
        this.kafkaTestUtils().sendMessages(topic$2, ((TraversableOnce)strings.map((Function1 & Serializable & scala.Serializable)x$4 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$4), (Object)BoxesRunTime.boxToInteger((int)1)), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public static final /* synthetic */ void $anonfun$new$48(Tuple2 x) {
        DirectKafkaStreamSuite$.MODULE$.total().set(x._2$mcI$sp());
    }

    public static final /* synthetic */ void $anonfun$new$47(RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])rdd.collect())).headOption().foreach((Function1 & Serializable & scala.Serializable)x -> {
            DirectKafkaStreamSuite.$anonfun$new$48(x);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$new$51(DirectKafkaStreamSuite $this, OffsetRange x$5) {
        return $this.convertToEqualizer(BoxesRunTime.boxToLong((long)x$5.fromOffset())).$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)0), Equality$.MODULE$.default());
    }

    public static final /* synthetic */ String $anonfun$new$59(int x$6) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$6)).toString();
    }

    private final void sendDataAndWaitForReceive$1(Seq data, String topic$3, ConcurrentLinkedQueue collectedData$3) {
        Seq strings = (Seq)data.map((Function1 & Serializable & scala.Serializable)x$6 -> DirectKafkaStreamSuite.$anonfun$new$59(BoxesRunTime.unboxToInt((Object)x$6)), Seq$.MODULE$.canBuildFrom());
        this.kafkaTestUtils().sendMessages(topic$3, ((TraversableOnce)strings.map((Function1 & Serializable & scala.Serializable)x$7 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$7), (Object)BoxesRunTime.boxToInteger((int)1)), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(strings.forall((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)collectedData$3.contains(x$1))), "strings.forall({\n  ((x$1: Any) => collectedData.contains(x$1))\n})", Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 434));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 433));
    }

    public static final /* synthetic */ void $anonfun$new$64(DirectKafkaStreamSuite $this, ConcurrentLinkedQueue collectedData$3, InputDStream kafkaStream$1, HashMap committed$1, RDD rdd, Time time) {
        OffsetRange[] offsets = ((HasOffsetRanges)rdd).offsetRanges();
        String[] data = (String[])rdd.map((Function1 & Serializable & scala.Serializable)x$8 -> (String)x$8.value(), ClassTag$.MODULE$.apply(String.class)).collect();
        collectedData$3.addAll(Arrays.asList((Object[])data));
        ((CanCommitOffsets)kafkaStream$1).commitAsync(offsets, new OffsetCommitCallback($this, committed$1){
            private final /* synthetic */ DirectKafkaStreamSuite $outer;
            private final HashMap committed$1;

            public void onComplete(java.util.Map<TopicPartition, OffsetAndMetadata> m, Exception e) {
                if (e != null) {
                    this.$outer.logError((Function0 & Serializable & scala.Serializable)() -> "commit failed", e);
                } else {
                    this.committed$1.putAll(m);
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.committed$1 = committed$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$onComplete$1()}, serializedLambda);
            }
        });
    }

    private static final String dataToString$1(ConcurrentLinkedQueue collectedData$4) {
        return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)collectedData$4).asScala()).map((Function1 & Serializable & scala.Serializable)x$9 -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$9)).mkString("[", ",", "]"), Iterable$.MODULE$.canBuildFrom())).mkString("{", ", ", "}");
    }

    public static final /* synthetic */ void $anonfun$new$80(ConcurrentLinkedQueue collectedData$4, RDD rdd, Time time) {
        String[] data = (String[])rdd.map((Function1 & Serializable & scala.Serializable)x$10 -> (String)x$10._2(), ClassTag$.MODULE$.apply(String.class)).collect();
        collectedData$4.add(data);
    }

    public static final /* synthetic */ boolean $anonfun$new$84(long expectedSize$1, String[] x$11) {
        return (long)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$11)).size() == expectedSize$1;
    }

    public static final /* synthetic */ Assertion $anonfun$new$82(DirectKafkaStreamSuite $this, ConcurrentLinkedQueue collectedData$4, ConstantEstimator estimator$1, int batchIntervalMilliseconds$1, int rate) {
        collectedData$4.clear();
        estimator$1.updateRate(rate);
        long expectedSize = Math.round((double)(rate * batchIntervalMilliseconds$1) * 0.001);
        return (Assertion)$this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds())), $this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)collectedData$4).asScala()).exists((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)DirectKafkaStreamSuite.$anonfun$new$84(expectedSize, x$11))), "scala.collection.JavaConverters.collectionAsScalaIterableConverter[Array[String]](collectedData).asScala.exists(((x$11: Array[String]) => scala.Predef.refArrayOps[String](x$11).size.==(expectedSize)))", Prettifier$.MODULE$.default());
            return $this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(41).append(" - No arrays of size ").append(expectedSize).append(" for rate ").append(rate).append(" found in ").append(DirectKafkaStreamSuite.dataToString$1(collectedData$4)).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 616));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 613));
    }

    public DirectKafkaStreamSuite() {
        BeforeAndAfter.$init$((BeforeAndAfter)this);
        ScaledTimeSpans.$init$((ScaledTimeSpans)this);
        AbstractPatienceConfiguration.$init$((AbstractPatienceConfiguration)this);
        PatienceConfiguration.$init$((PatienceConfiguration)this);
        Eventually.$init$((Eventually)this);
        this.sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName());
        this.after((Function0<Object>)(Function0 & Serializable & scala.Serializable)() -> {
            BoxedUnit boxedUnit;
            if (this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc() != null) {
                this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().stop(true);
            }
            if (this.testDir() != null) {
                Utils$.MODULE$.deleteRecursively(this.testDir());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return boxedUnit;
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 73));
        this.preferredHosts = LocationStrategies$.MODULE$.PreferConsistent();
        this.test("basic stream receiving with multiple topics and smallest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            .colon.colon topics = new .colon.colon((Object)"basic1", (List)new .colon.colon((Object)"basic2", (List)new .colon.colon((Object)"basic3", (List)Nil$.MODULE$)));
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            topics.foreach((Function1 & Serializable & scala.Serializable)t -> {
                DirectKafkaStreamSuite.$anonfun$new$3(this, data, t);
                return BoxedUnit.UNIT;
            });
            scala.collection.immutable.Map offsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("basic3", 0)), (Object)BoxesRunTime.boxToLong((long)2L))}));
            int expectedTotal = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) * topics.size() - 2;
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(1000L)));
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", () -> DirectKafkaStreamSuite.$anonfun$new$4(this, (List)topics, kafkaParams, offsets));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            ObjectRef offsetRanges = ObjectRef.create((Object)((OffsetRange[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(OffsetRange.class))));
            DStream tf = stream.transform((Function1 & Serializable & scala.Serializable)rdd -> {
                offsetRanges$1.elem = ((HasOffsetRanges)rdd).offsetRanges();
                return rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class));
            }, ClassTag$.MODULE$.apply(Tuple2.class));
            tf.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$7(this, offsetRanges, rdd);
                return BoxedUnit.UNIT;
            });
            stream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).collect()));
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(100000)).milliseconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(1000)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = expectedTotal;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 149));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 148));
            StreamingContext qual$1 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$1 = qual$1.stop$default$1();
            qual$1.stop(x$1);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 94));
        this.test("pattern based subscription", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            .colon.colon topics = new .colon.colon((Object)"pat1", (List)new .colon.colon((Object)"pat2", (List)new .colon.colon((Object)"pat3", (List)new .colon.colon((Object)"advanced3", (List)Nil$.MODULE$))));
            Pattern pat = new StringOps(Predef$.MODULE$.augmentString("pat\\d")).r().pattern();
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            topics.foreach((Function1 & Serializable & scala.Serializable)t -> {
                DirectKafkaStreamSuite.$anonfun$new$16(this, data, t);
                return BoxedUnit.UNIT;
            });
            scala.collection.immutable.Map offsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("pat2", 0)), (Object)BoxesRunTime.boxToLong((long)3L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("pat3", 0)), (Object)BoxesRunTime.boxToLong((long)4L))}));
            int expectedTotal = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) * 3 - 7;
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(1000L)));
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.SubscribePattern(pat, (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala(), (Map)offsets)));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            ObjectRef offsetRanges = ObjectRef.create((Object)((OffsetRange[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(OffsetRange.class))));
            DStream tf = stream.transform((Function1 & Serializable & scala.Serializable)rdd -> {
                offsetRanges$2.elem = ((HasOffsetRanges)rdd).offsetRanges();
                return rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class));
            }, ClassTag$.MODULE$.apply(Tuple2.class));
            tf.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$20(this, offsetRanges, rdd);
                return BoxedUnit.UNIT;
            });
            stream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).collect()));
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(100000)).milliseconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(1000)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = expectedTotal;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 215));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 214));
            StreamingContext qual$2 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$2 = qual$2.stop$default$1();
            qual$2.stop(x$2);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 156));
        this.test("receiving from largest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "latest";
            TopicPartition topicPartition = new TopicPartition(topic, 0);
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")}));
            KafkaConsumer kc = new KafkaConsumer(kafkaParams);
            kc.assign(Arrays.asList((Object[])new TopicPartition[]{topicPartition}));
            this.kafkaTestUtils().sendMessages(topic, data);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                long $org_scalatest_assert_macro_left = DirectKafkaStreamSuite.getLatestOffset$1(kc, topicPartition);
                int $org_scalatest_assert_macro_right = 3;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 239));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 238));
            long offsetBeforeStart = DirectKafkaStreamSuite.getLatestOffset$1(kc, topicPartition);
            kc.close();
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            DirectKafkaInputDStream stream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> {
                DirectKafkaInputDStream s = new DirectKafkaInputDStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()), (PerPartitionConfig)new DefaultPerPartitionConfig(this.sparkConf()));
                s.consumer().poll(0L);
                long $org_scalatest_assert_macro_left = s.consumer().position(topicPartition);
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 253));
                return s;
            });
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1.value(), ClassTag$.MODULE$.apply(String.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                collectedData.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            scala.collection.immutable.Map newData = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().sendMessages(topic, newData);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> collectedData.contains("b"), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 267));
            ConcurrentLinkedQueue $org_scalatest_assert_macro_left = collectedData;
            String $org_scalatest_assert_macro_right = "a";
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 270));
            StreamingContext qual$3 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$3 = qual$3.stop$default$1();
            qual$3.stop(x$3);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 223));
        this.ignore("creating stream by offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "offset";
            TopicPartition topicPartition = new TopicPartition(topic, 0);
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")}));
            KafkaConsumer kc = new KafkaConsumer(kafkaParams);
            kc.assign(Arrays.asList((Object[])new TopicPartition[]{topicPartition}));
            this.kafkaTestUtils().sendMessages(topic, data);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                long $org_scalatest_assert_macro_left = DirectKafkaStreamSuite.getLatestOffset$2(kc, topicPartition);
                int $org_scalatest_assert_macro_right = 10;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 291));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 290));
            long offsetBeforeStart = DirectKafkaStreamSuite.getLatestOffset$2(kc, topicPartition);
            kc.close();
            kafkaParams.put("auto.offset.reset", "none");
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            DirectKafkaInputDStream stream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> {
                DirectKafkaInputDStream s = new DirectKafkaInputDStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Assign((Iterable)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$), (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala(), (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)BoxesRunTime.boxToLong((long)11L))}))), (PerPartitionConfig)new DefaultPerPartitionConfig(this.sparkConf()));
                s.consumer().poll(0L);
                long $org_scalatest_assert_macro_left = s.consumer().position(topicPartition);
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 309));
                return s;
            });
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2.value(), ClassTag$.MODULE$.apply(String.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                collectedData.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            scala.collection.immutable.Map newData = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().sendMessages(topic, newData);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> collectedData.contains("b"), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 321));
            ConcurrentLinkedQueue $org_scalatest_assert_macro_left = collectedData;
            String $org_scalatest_assert_macro_right = "a";
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 324));
            StreamingContext qual$4 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$4 = qual$4.stop$default$1();
            qual$4.stop(x$4);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 275));
        this.ignore("offset recovery", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "recovery";
            this.kafkaTestUtils().createTopic(topic);
            this.testDir_$eq(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()));
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(100L)));
            InputDStream kafkaStream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala())));
            DStream keyedStream = kafkaStream.map((Function1 & Serializable & scala.Serializable)r -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"key"), (Object)BoxesRunTime.boxToInteger((int)new StringOps(Predef$.MODULE$.augmentString((String)r.value())).toInt())), ClassTag$.MODULE$.apply(Tuple2.class));
            DStream stateStream = DStream$.MODULE$.toPairDStreamFunctions(keyedStream, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), (Ordering)Ordering.String$.MODULE$).updateStateByKey((Function2 & Serializable & scala.Serializable)(values, state) -> new Some((Object)BoxesRunTime.boxToInteger((int)(BoxesRunTime.unboxToInt((Object)values.sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) + BoxesRunTime.unboxToInt((Object)state.getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 0))))), ClassTag$.MODULE$.Int());
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().checkpoint(this.testDir().getAbsolutePath());
            stateStream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$47(rdd);
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendData$1((Seq)i, topic);
                return BoxedUnit.UNIT;
            });
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 371));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 370));
            StreamingContext qual$5 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$52 = qual$5.stop$default$1();
            qual$5.stop(x$52);
            Seq<Tuple2<Time, OffsetRange[]>> offsetRangesBeforeStop = this.getOffsetRanges((DStream)kafkaStream);
            int $org_scalatest_assert_macro_left = offsetRangesBeforeStop.size();
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"No offset ranges generated", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 378));
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((Tuple2)offsetRangesBeforeStop.head())._2())).forall((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)DirectKafkaStreamSuite.$anonfun$new$51(this, x$5))), "scala.Predef.refArrayOps[org.apache.spark.streaming.kafka09.OffsetRange](offsetRangesBeforeStop.head._2).forall(((x$5: org.apache.spark.streaming.kafka09.OffsetRange) => DirectKafkaStreamSuite.this.convertToEqualizer[Long](x$5.fromOffset).===(0)(scalactic.this.Equality.default[Long])))", Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"starting offset not zero", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 379));
            this.logInfo((Function0 & Serializable & scala.Serializable)() -> "====== RESTARTING ========");
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.testDir().getAbsolutePath()));
            DStream recoveredStream = (DStream)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().graph().getInputStreams())).head();
            Seq recoveredOffsetRanges = (Seq)this.getOffsetRanges(recoveredStream).map((Function1 & Serializable & scala.Serializable)x -> new Tuple2(x._1(), (Object)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x._2())).toSet()), Seq$.MODULE$.canBuildFrom());
            int $org_scalatest_assert_macro_left2 = recoveredOffsetRanges.size();
            int $org_scalatest_assert_macro_right2 = 0;
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 > $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"No offset ranges recovered", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 393));
            Seq earlierOffsetRanges = (Seq)offsetRangesBeforeStop.map((Function1 & Serializable & scala.Serializable)x -> new Tuple2(x._1(), (Object)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x._2())).toSet()), Seq$.MODULE$.canBuildFrom());
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.simpleMacroBool(recoveredOffsetRanges.forall((Function1 & Serializable & scala.Serializable)or -> BoxesRunTime.boxToBoolean((boolean)earlierOffsetRanges.contains((Object)new Tuple2(or._1(), or._2())))), "recoveredOffsetRanges.forall(((or: (org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka09.OffsetRange])) => earlierOffsetRanges.contains[(org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka09.OffsetRange])](scala.Tuple2.apply[org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka09.OffsetRange]](or._1, or._2))))", Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)new StringBuilder(57).append("Recovered ranges are not the same as the ones generated\n").append(earlierOffsetRanges).append("\n").append(recoveredOffsetRanges).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 395));
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 20).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendData$1((Seq)i, topic);
                return BoxedUnit.UNIT;
            });
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 20).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 410));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 409));
            StreamingContext qual$6 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$6 = qual$6.stop$default$1();
            qual$6.stop(x$6);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 329));
        this.ignore("offset recovery from kafka", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "recoveryfromkafka";
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest"), new Tuple2((Object)"enable.auto.commit", (Object)Predef$.MODULE$.boolean2Boolean(false))}));
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            HashMap committed = new HashMap();
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(100L)));
            this.withClue("Error creating direct stream", (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                InputDStream kafkaStream = KafkaUtils$.MODULE$.createDirectStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()));
                kafkaStream.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                    DirectKafkaStreamSuite.$anonfun$new$64(this, collectedData, kafkaStream, committed, rdd, time);
                    return BoxedUnit.UNIT;
                });
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendDataAndWaitForReceive$1((Seq)i, topic, collectedData);
                return BoxedUnit.UNIT;
            });
            StreamingContext qual$7 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$7 = qual$7.stop$default$1();
            qual$7.stop(x$7);
            HashMap $org_scalatest_assert_macro_left = committed;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.unaryMacroBool($org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 467));
            KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
            consumer.subscribe(Arrays.asList((Object[])new String[]{topic}));
            consumer.poll(0L);
            ((IterableLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(committed).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
                Tuple2 tuple2 = x0$3;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                TopicPartition k = (TopicPartition)tuple2._1();
                OffsetAndMetadata v = (OffsetAndMetadata)tuple2._2();
                long $org_scalatest_assert_macro_left = v.offset();
                int $org_scalatest_assert_macro_right = 0;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 474));
                long $org_scalatest_assert_macro_left2 = consumer.position(k);
                long $org_scalatest_assert_macro_right2 = v.offset();
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left2), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 >= $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
                Assertion assertion = this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 475));
                return assertion;
            });
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 417));
        this.test("Direct Kafka stream report input information", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "report-test";
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            this.kafkaTestUtils().createTopic(topic);
            this.kafkaTestUtils().sendMessages(topic, data);
            int totalSent = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            InputInfoCollector collector = new InputInfoCollector();
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().addStreamingListener((StreamingListener)collector);
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala())));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20000)).milliseconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(200)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = totalSent;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 507));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsSubmitted().get()));
                int $org_scalatest_assert_macro_right2 = totalSent;
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 512));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsStarted().get()));
                int $org_scalatest_assert_macro_right3 = totalSent;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 513));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsCompleted().get()));
                int $org_scalatest_assert_macro_right4 = totalSent;
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 514));
            }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 506));
            StreamingContext qual$8 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$8 = qual$8.stop$default$1();
            qual$8.stop(x$8);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 480));
        this.test("maxMessagesPerPartition with backpressure disabled", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartitionBackpressureDisabled";
            this.kafkaTestUtils().createTopic(topic, 2);
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)None$.MODULE$, (Option<PerPartitionConfig>)None$.MODULE$);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)50L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)50L))}));
            scala.collection.immutable.Map $org_scalatest_assert_macro_left = (scala.collection.immutable.Map)kafkaStream.maxMessagesPerPartition(input).get();
            scala.collection.immutable.Map $org_scalatest_assert_macro_right = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)10L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
            scala.collection.immutable.Map map = $org_scalatest_assert_macro_left;
            scala.collection.immutable.Map map2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null), Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 525));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 519));
        this.test("maxMessagesPerPartition with no lag", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartitionNoLag";
            this.kafkaTestUtils().createTopic(topic, 2);
            Some rateController = new Some((Object)new ConstantRateController(0, new ConstantEstimator(100L), 100L));
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)rateController, (Option<PerPartitionConfig>)None$.MODULE$);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)0L))}));
            Option $org_scalatest_assert_macro_left = kafkaStream.maxMessagesPerPartition(input);
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 536));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 529));
        this.test("maxMessagesPerPartition respects max rate", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartitionRespectsMaxRate";
            this.kafkaTestUtils().createTopic(topic, 2);
            Some rateController = new Some((Object)new ConstantRateController(0, new ConstantEstimator(100L), 1000L));
            Some ppc = new Some((Object)new PerPartitionConfig(null, topic){
                private final String topic$5;

                public long maxRatePerPartition(TopicPartition tp) {
                    String string = tp.topic();
                    String string2 = this.topic$5;
                    return !(string != null ? !string.equals(string2) : string2 != null) && tp.partition() == 0 ? 50L : 100L;
                }
                {
                    this.topic$5 = topic$5;
                }
            });
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)rateController, (Option<PerPartitionConfig>)ppc);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)1000L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)1000L))}));
            scala.collection.immutable.Map $org_scalatest_assert_macro_left = (scala.collection.immutable.Map)kafkaStream.maxMessagesPerPartition(input).get();
            scala.collection.immutable.Map $org_scalatest_assert_macro_right = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)5L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
            scala.collection.immutable.Map map = $org_scalatest_assert_macro_left;
            scala.collection.immutable.Map map2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null), Prettifier$.MODULE$.default());
            return this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 554));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 539));
        this.test("using rate controller", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "backpressure";
            this.kafkaTestUtils().createTopic(topic, 1);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            HashMap<String, Object> executorKafkaParams = new HashMap<String, Object>(kafkaParams);
            KafkaUtils$.MODULE$.fixKafkaParams(executorKafkaParams);
            int batchIntervalMilliseconds = 500;
            ConstantEstimator estimator = new ConstantEstimator(100L);
            scala.collection.immutable.Map messages = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"foo"), (Object)BoxesRunTime.boxToInteger((int)5000))}));
            this.kafkaTestUtils().sendMessages(topic, messages);
            SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
            DStream kafkaStream = (DStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> new DirectKafkaInputDStream<String, String>(this, topic, kafkaParams, sparkConf, estimator){
                private final Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController;

                public Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController() {
                    return this.rateController;
                }
                {
                    this.rateController = new Some((Object)new DirectKafkaInputDStream.DirectKafkaRateController((DirectKafkaInputDStream)this, this.id(), (RateEstimator)estimator$1));
                }
            }.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)));
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            kafkaStream.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                DirectKafkaStreamSuite.$anonfun$new$80(collectedData, rdd, time);
                return BoxedUnit.UNIT;
            });
            this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc().start();
            ((IterableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 50, 20}))).foreach((Function1 & Serializable & scala.Serializable)rate -> DirectKafkaStreamSuite.$anonfun$new$82(this, collectedData, estimator, batchIntervalMilliseconds, BoxesRunTime.unboxToInt((Object)rate)));
            StreamingContext qual$9 = this.org$apache$spark$streaming$kafka09$DirectKafkaStreamSuite$$ssc();
            boolean x$9 = qual$9.stop$default$1();
            qual$9.stop(x$9);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 558));
    }

    public static class InputInfoCollector
    implements StreamingListener {
        private final AtomicLong numRecordsSubmitted;
        private final AtomicLong numRecordsStarted;
        private final AtomicLong numRecordsCompleted;

        public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
            StreamingListener.onStreamingStarted$((StreamingListener)this, (StreamingListenerStreamingStarted)streamingStarted);
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
            StreamingListener.onReceiverStarted$((StreamingListener)this, (StreamingListenerReceiverStarted)receiverStarted);
        }

        public void onReceiverError(StreamingListenerReceiverError receiverError) {
            StreamingListener.onReceiverError$((StreamingListener)this, (StreamingListenerReceiverError)receiverError);
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
            StreamingListener.onReceiverStopped$((StreamingListener)this, (StreamingListenerReceiverStopped)receiverStopped);
        }

        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {
            StreamingListener.onOutputOperationStarted$((StreamingListener)this, (StreamingListenerOutputOperationStarted)outputOperationStarted);
        }

        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {
            StreamingListener.onOutputOperationCompleted$((StreamingListener)this, (StreamingListenerOutputOperationCompleted)outputOperationCompleted);
        }

        public AtomicLong numRecordsSubmitted() {
            return this.numRecordsSubmitted;
        }

        public AtomicLong numRecordsStarted() {
            return this.numRecordsStarted;
        }

        public AtomicLong numRecordsCompleted() {
            return this.numRecordsCompleted;
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
            this.numRecordsSubmitted().addAndGet(batchSubmitted.batchInfo().numRecords());
        }

        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
            this.numRecordsStarted().addAndGet(batchStarted.batchInfo().numRecords());
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
            this.numRecordsCompleted().addAndGet(batchCompleted.batchInfo().numRecords());
        }

        public InputInfoCollector() {
            StreamingListener.$init$((StreamingListener)this);
            this.numRecordsSubmitted = new AtomicLong(0L);
            this.numRecordsStarted = new AtomicLong(0L);
            this.numRecordsCompleted = new AtomicLong(0L);
        }
    }
}

