/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.Time$;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.DStream$;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka.ConstantEstimator;
import org.apache.spark.streaming.kafka.ConstantRateController;
import org.apache.spark.streaming.kafka.DirectKafkaInputDStream;
import org.apache.spark.streaming.kafka.DirectKafkaStreamSuite$;
import org.apache.spark.streaming.kafka.DirectKafkaStreamSuite$$anonfun$1$;
import org.apache.spark.streaming.kafka.DirectKafkaStreamSuite$$anonfun$9$;
import org.apache.spark.streaming.kafka.DirectKafkaStreamSuite$$anonfun$9$$anonfun$24$;
import org.apache.spark.streaming.kafka.DirectKafkaStreamSuite$$anonfun$9$$anonfun$apply$mcV$sp$9$;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaRDD;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.TripleEqualsSupport;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.BeforeAndAfterAll;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.concurrent.AbstractPatienceConfiguration;
import org.scalatest.concurrent.AbstractPatienceConfiguration$PatienceConfig$;
import org.scalatest.concurrent.Eventually;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.ScaledTimeSpans;
import org.scalatest.time.Span;
import org.scalatest.time.Span$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.math.Numeric;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\t\rb\u0001B\u0001\u0003\u00015\u0011a\u0003R5sK\u000e$8*\u00194lCN#(/Z1n'VLG/\u001a\u0006\u0003\u0007\u0011\tQa[1gW\u0006T!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0004\u0001M1\u0001A\u0004\n\u00197\u0005\u0002\"a\u0004\t\u000e\u0003\u0019I!!\u0005\u0004\u0003\u001bM\u0003\u0018M]6Gk:\u001cV/\u001b;f!\t\u0019b#D\u0001\u0015\u0015\t)\"\"A\u0005tG\u0006d\u0017\r^3ti&\u0011q\u0003\u0006\u0002\u000f\u0005\u00164wN]3B]\u0012\fe\r^3s!\t\u0019\u0012$\u0003\u0002\u001b)\t\t\")\u001a4pe\u0016\fe\u000eZ!gi\u0016\u0014\u0018\t\u001c7\u0011\u0005qyR\"A\u000f\u000b\u0005y!\u0012AC2p]\u000e,(O]3oi&\u0011\u0001%\b\u0002\u000b\u000bZ,g\u000e^;bY2L\bC\u0001\u0012&\u001b\u0005\u0019#B\u0001\u0013\u0007\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001\u0014$\u0005\u001daunZ4j]\u001eDQ\u0001\u000b\u0001\u0005\u0002%\na\u0001P5oSRtD#\u0001\u0016\u0011\u0005-\u0002Q\"\u0001\u0002\t\u000f5\u0002!\u0019!C\u0001]\u0005I1\u000f]1sW\u000e{gNZ\u000b\u0002_A\u0011q\u0002M\u0005\u0003c\u0019\u0011\u0011b\u00159be.\u001cuN\u001c4\t\rM\u0002\u0001\u0015!\u00030\u0003)\u0019\b/\u0019:l\u0007>tg\r\t\u0005\nk\u0001\u0001\r\u00111A\u0005\nY\n1a]:d+\u00059\u0004C\u0001\u001d:\u001b\u0005!\u0011B\u0001\u001e\u0005\u0005A\u0019FO]3b[&twmQ8oi\u0016DH\u000fC\u0005=\u0001\u0001\u0007\t\u0019!C\u0005{\u000591o]2`I\u0015\fHC\u0001 E!\ty$)D\u0001A\u0015\u0005\t\u0015!B:dC2\f\u0017BA\"A\u0005\u0011)f.\u001b;\t\u000f\u0015[\u0014\u0011!a\u0001o\u0005\u0019\u0001\u0010J\u0019\t\r\u001d\u0003\u0001\u0015)\u00038\u0003\u0011\u00198o\u0019\u0011\t\u0013%\u0003\u0001\u0019!a\u0001\n\u0013Q\u0015a\u0002;fgR$\u0015N]\u000b\u0002\u0017B\u0011A*U\u0007\u0002\u001b*\u0011ajT\u0001\u0003S>T\u0011\u0001U\u0001\u0005U\u00064\u0018-\u0003\u0002S\u001b\n!a)\u001b7f\u0011%!\u0006\u00011AA\u0002\u0013%Q+A\u0006uKN$H)\u001b:`I\u0015\fHC\u0001 W\u0011\u001d)5+!AA\u0002-Ca\u0001\u0017\u0001!B\u0013Y\u0015\u0001\u0003;fgR$\u0015N\u001d\u0011\t\u0013i\u0003\u0001\u0019!a\u0001\n\u0013Y\u0016AD6bM.\fG+Z:u+RLGn]\u000b\u00029B\u00111&X\u0005\u0003=\n\u0011abS1gW\u0006$Vm\u001d;Vi&d7\u000fC\u0005a\u0001\u0001\u0007\t\u0019!C\u0005C\u0006\u00112.\u00194lCR+7\u000f^+uS2\u001cx\fJ3r)\tq$\rC\u0004F?\u0006\u0005\t\u0019\u0001/\t\r\u0011\u0004\u0001\u0015)\u0003]\u0003=Y\u0017MZ6b)\u0016\u001cH/\u0016;jYN\u0004\u0003\"\u00024\u0001\t\u0003:\u0017!\u00032fM>\u0014X-\u00117m)\u0005q\u0004\"B5\u0001\t\u0003:\u0017\u0001C1gi\u0016\u0014\u0018\t\u001c7\t\u000b-\u0004A\u0011\u00027\u0002\u001f\u001d,Go\u00144gg\u0016$(+\u00198hKN,R!\\A\u0012\u0003o!2A\\A\u0007!\rywO\u001f\b\u0003aVt!!\u001d;\u000e\u0003IT!a\u001d\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0015B\u0001<A\u0003\u001d\u0001\u0018mY6bO\u0016L!\u0001_=\u0003\u0007M+\u0017O\u0003\u0002w\u0001B)qh_?\u0002\u0002%\u0011A\u0010\u0011\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0005ar\u0018BA@\u0005\u0005\u0011!\u0016.\\3\u0011\u000b}\n\u0019!a\u0002\n\u0007\u0005\u0015\u0001IA\u0003BeJ\f\u0017\u0010E\u0002,\u0003\u0013I1!a\u0003\u0003\u0005-yeMZ:fiJ\u000bgnZ3\t\u000f\u0005=!\u000e1\u0001\u0002\u0012\u0005Y1.\u00194lCN#(/Z1n!\u0019\t\u0019\"!\u0007\u0002\u001e5\u0011\u0011Q\u0003\u0006\u0004\u0003/!\u0011a\u00023tiJ,\u0017-\\\u0005\u0005\u00037\t)BA\u0004E'R\u0014X-Y7\u0011\r}Z\u0018qDA\u001b!\u0011\t\t#a\t\r\u0001\u00119\u0011Q\u00056C\u0002\u0005\u001d\"!A&\u0012\t\u0005%\u0012q\u0006\t\u0004\u007f\u0005-\u0012bAA\u0017\u0001\n9aj\u001c;iS:<\u0007cA \u00022%\u0019\u00111\u0007!\u0003\u0007\u0005s\u0017\u0010\u0005\u0003\u0002\"\u0005]BaBA\u001dU\n\u0007\u0011q\u0005\u0002\u0002-\"9\u0011Q\b\u0001\u0005\n\u0005}\u0012\u0001F4fi\u0012K'/Z2u\u0017\u000647.Y*ue\u0016\fW\u000e\u0006\u0004\u0002B\u0005\u0015\u0014\u0011\u000e\t\u000eW\u0005\r\u0013qIA$\u0003+\n)&a\u0019\n\u0007\u0005\u0015#AA\fESJ,7\r^&bM.\f\u0017J\u001c9vi\u0012\u001bFO]3b[B!\u0011\u0011JA(\u001d\ry\u00141J\u0005\u0004\u0003\u001b\u0002\u0015A\u0002)sK\u0012,g-\u0003\u0003\u0002R\u0005M#AB*ue&twMC\u0002\u0002N\u0001\u0003B!a\u0016\u0002`5\u0011\u0011\u0011\f\u0006\u0005\u00037\ni&\u0001\u0006tKJL\u0017\r\\5{KJT\u0011aA\u0005\u0005\u0003C\nIFA\u0007TiJLgn\u001a#fG>$WM\u001d\t\u0007\u007fm\f9%a\u0012\t\u0011\u0005\u001d\u00141\ba\u0001\u0003\u000f\nQ\u0001^8qS\u000eD\u0001\"a\u001b\u0002<\u0001\u0007\u0011QN\u0001\u0013[>\u001c7NU1uK\u000e{g\u000e\u001e:pY2,'\u000fE\u0003@\u0003_\n\u0019(C\u0002\u0002r\u0001\u0013aa\u00149uS>t\u0007\u0003BA;\u0003wj!!a\u001e\u000b\u0007\u0005eD!A\u0005tG\",G-\u001e7fe&!\u0011QPA<\u00059\u0011\u0016\r^3D_:$(o\u001c7mKJ<q!!!\u0003\u0011\u0003\t\u0019)\u0001\fESJ,7\r^&bM.\f7\u000b\u001e:fC6\u001cV/\u001b;f!\rY\u0013Q\u0011\u0004\u0007\u0003\tA\t!a\"\u0014\r\u0005\u0015\u0015\u0011RAH!\ry\u00141R\u0005\u0004\u0003\u001b\u0003%AB!osJ+g\rE\u0002@\u0003#K1!a%A\u00051\u0019VM]5bY&T\u0018M\u00197f\u0011\u001dA\u0013Q\u0011C\u0001\u0003/#\"!a!\t\u0015\u0005m\u0015Q\u0011b\u0001\n\u0003\ti*A\u0003u_R\fG.\u0006\u0002\u0002 B!\u0011\u0011UAW\u001b\t\t\u0019K\u0003\u0003\u0002&\u0006\u001d\u0016AB1u_6L7MC\u0002\u001f\u0003SS1!a+P\u0003\u0011)H/\u001b7\n\t\u0005=\u00161\u0015\u0002\u000b\u0003R|W.[2M_:<\u0007\"CAZ\u0003\u000b\u0003\u000b\u0011BAP\u0003\u0019!x\u000e^1mA\u00199\u0011qWAC\u0001\u0005e&AE%oaV$\u0018J\u001c4p\u0007>dG.Z2u_J\u001cb!!.\u0002\n\u0006m\u0006\u0003BA;\u0003{KA!a0\u0002x\t\t2\u000b\u001e:fC6Lgn\u001a'jgR,g.\u001a:\t\u000f!\n)\f\"\u0001\u0002DR\u0011\u0011Q\u0019\t\u0005\u0003\u000f\f),\u0004\u0002\u0002\u0006\"Q\u00111ZA[\u0005\u0004%\t!!(\u0002'9,XNU3d_J$7oU;c[&$H/\u001a3\t\u0013\u0005=\u0017Q\u0017Q\u0001\n\u0005}\u0015\u0001\u00068v[J+7m\u001c:egN+(-\\5ui\u0016$\u0007\u0005\u0003\u0006\u0002T\u0006U&\u0019!C\u0001\u0003;\u000b\u0011C\\;n%\u0016\u001cwN\u001d3t'R\f'\u000f^3e\u0011%\t9.!.!\u0002\u0013\ty*\u0001\nok6\u0014VmY8sIN\u001cF/\u0019:uK\u0012\u0004\u0003BCAn\u0003k\u0013\r\u0011\"\u0001\u0002\u001e\u0006\u0019b.^7SK\u000e|'\u000fZ:D_6\u0004H.\u001a;fI\"I\u0011q\\A[A\u0003%\u0011qT\u0001\u0015]Vl'+Z2pe\u0012\u001c8i\\7qY\u0016$X\r\u001a\u0011\t\u0011\u0005\r\u0018Q\u0017C!\u0003K\f\u0001c\u001c8CCR\u001c\u0007nU;c[&$H/\u001a3\u0015\u0007y\n9\u000f\u0003\u0005\u0002j\u0006\u0005\b\u0019AAv\u00039\u0011\u0017\r^2i'V\u0014W.\u001b;uK\u0012\u0004B!!\u001e\u0002n&!\u0011q^A<\u0005}\u0019FO]3b[&tw\rT5ti\u0016tWM\u001d\"bi\u000eD7+\u001e2nSR$X\r\u001a\u0005\t\u0003g\f)\f\"\u0011\u0002v\u0006qqN\u001c\"bi\u000eD7\u000b^1si\u0016$Gc\u0001 \u0002x\"A\u0011\u0011`Ay\u0001\u0004\tY0\u0001\u0007cCR\u001c\u0007n\u0015;beR,G\r\u0005\u0003\u0002v\u0005u\u0018\u0002BA\u0000\u0003o\u0012Qd\u0015;sK\u0006l\u0017N\\4MSN$XM\\3s\u0005\u0006$8\r[*uCJ$X\r\u001a\u0005\t\u0005\u0007\t)\f\"\u0011\u0003\u0006\u0005\u0001rN\u001c\"bi\u000eD7i\\7qY\u0016$X\r\u001a\u000b\u0004}\t\u001d\u0001\u0002\u0003B\u0005\u0005\u0003\u0001\rAa\u0003\u0002\u001d\t\fGo\u00195D_6\u0004H.\u001a;fIB!\u0011Q\u000fB\u0007\u0013\u0011\u0011y!a\u001e\u0003?M#(/Z1nS:<G*[:uK:,'OQ1uG\"\u001cu.\u001c9mKR,G\r\u0003\u0006\u0003\u0014\u0005\u0015\u0015\u0011!C\u0005\u0005+\t1B]3bIJ+7o\u001c7wKR\u0011!q\u0003\t\u0005\u00053\u0011y\"\u0004\u0002\u0003\u001c)\u0019!QD(\u0002\t1\fgnZ\u0005\u0005\u0005C\u0011YB\u0001\u0004PE*,7\r\u001e")
public class DirectKafkaStreamSuite
extends SparkFunSuite
implements BeforeAndAfter,
Eventually {
    private final SparkConf sparkConf;
    private StreamingContext org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc;
    private File org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir;
    private KafkaTestUtils org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils;
    private final AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    private volatile AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$module;

    public static AtomicLong total() {
        return DirectKafkaStreamSuite$.MODULE$.total();
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, PatienceConfiguration.Interval interval, Function0<T> fun) {
        return (T)Eventually.class.eventually((Eventually)this, (PatienceConfiguration.Timeout)timeout, (PatienceConfiguration.Interval)interval, fun);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config) {
        return (T)Eventually.class.eventually((Eventually)this, (PatienceConfiguration.Timeout)timeout, fun, (AbstractPatienceConfiguration.PatienceConfig)config);
    }

    public <T> T eventually(PatienceConfiguration.Interval interval, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config) {
        return (T)Eventually.class.eventually((Eventually)this, (PatienceConfiguration.Interval)interval, fun, (AbstractPatienceConfiguration.PatienceConfig)config);
    }

    public <T> T eventually(Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config) {
        return (T)Eventually.class.eventually((Eventually)this, fun, (AbstractPatienceConfiguration.PatienceConfig)config);
    }

    public AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig() {
        return this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    }

    public void org$scalatest$concurrent$PatienceConfiguration$_setter_$org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig_$eq(AbstractPatienceConfiguration.PatienceConfig x$1) {
        this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig = x$1;
    }

    public AbstractPatienceConfiguration.PatienceConfig patienceConfig() {
        return PatienceConfiguration.class.patienceConfig((PatienceConfiguration)this);
    }

    public PatienceConfiguration.Timeout timeout(Span value) {
        return PatienceConfiguration.class.timeout((PatienceConfiguration)this, (Span)value);
    }

    public PatienceConfiguration.Interval interval(Span value) {
        return PatienceConfiguration.class.interval((PatienceConfiguration)this, (Span)value);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$lzycompute() {
        DirectKafkaStreamSuite directKafkaStreamSuite = this;
        synchronized (directKafkaStreamSuite) {
            if (this.PatienceConfig$module != null) return this.PatienceConfig$module;
            this.PatienceConfig$module = new AbstractPatienceConfiguration$PatienceConfig$((AbstractPatienceConfiguration)this);
            return this.PatienceConfig$module;
        }
    }

    public AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig() {
        return this.PatienceConfig$module == null ? this.PatienceConfig$lzycompute() : this.PatienceConfig$module;
    }

    public final Span scaled(Span span) {
        return ScaledTimeSpans.class.scaled((ScaledTimeSpans)this, (Span)span);
    }

    public double spanScaleFactor() {
        return ScaledTimeSpans.class.spanScaleFactor((ScaledTimeSpans)this);
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean x$1) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = x$1;
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$runTest(String testName, Args args) {
        return FunSuiteLike.class.runTest((FunSuiteLike)this, (String)testName, (Args)args);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfter$$super$run(Option testName, Args args) {
        return BeforeAndAfterAll.class.run((BeforeAndAfterAll)this, (Option)testName, (Args)args);
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference x$1) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = x$1;
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference x$1) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = x$1;
    }

    public void before(Function0<Object> fun) {
        BeforeAndAfter.class.before((BeforeAndAfter)this, fun);
    }

    public void after(Function0<Object> fun) {
        BeforeAndAfter.class.after((BeforeAndAfter)this, fun);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfter.class.runTest((BeforeAndAfter)this, (String)testName, (Args)args);
    }

    public Status run(Option<String> testName, Args args) {
        return BeforeAndAfter.class.run((BeforeAndAfter)this, testName, (Args)args);
    }

    public SparkConf sparkConf() {
        return this.sparkConf;
    }

    public StreamingContext org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc() {
        return this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc;
    }

    public void org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(StreamingContext x$1) {
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc = x$1;
    }

    public File org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir() {
        return this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir;
    }

    public void org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir_$eq(File x$1) {
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir = x$1;
    }

    public KafkaTestUtils org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils() {
        return this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils;
    }

    private void org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils_$eq(KafkaTestUtils x$1) {
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils = x$1;
    }

    public void beforeAll() {
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils_$eq(new KafkaTestUtils());
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().setup();
    }

    public void afterAll() {
        if (this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils() != null) {
            this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().teardown();
            this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils_$eq(null);
        }
    }

    public <K, V> Seq<Tuple2<Time, OffsetRange[]>> org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getOffsetRanges(DStream<Tuple2<K, V>> kafkaStream) {
        return (Seq)kafkaStream.generatedRDDs().mapValues((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final OffsetRange[] apply(RDD<Tuple2<K, V>> rdd) {
                return ((KafkaRDD)rdd).offsetRanges();
            }
        }).toSeq().sortBy((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Time apply(Tuple2<Time, OffsetRange[]> x$8) {
                return (Time)x$8._1();
            }
        }, Time$.MODULE$.ordering());
    }

    public DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>> org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getDirectKafkaStream(String topic, Option<RateController> mockRateController) {
        int batchIntervalMilliseconds = 100;
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(((Object)((Object)this)).getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
        this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
        Map earliestOffsets = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)0L))}));
        Serializable messageHandler = new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, String> apply(MessageAndMetadata<String, String> mmd) {
                return new Tuple2(mmd.key(), mmd.message());
            }
        };
        return new DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>>(this, mockRateController, earliestOffsets, (Function1)messageHandler){
            private final Option<RateController> rateController;

            public Option<RateController> rateController() {
                return this.rateController;
            }
            {
                this.rateController = mockRateController$1;
            }
        };
    }

    public DirectKafkaStreamSuite() {
        BeforeAndAfter.class.$init$((BeforeAndAfter)this);
        ScaledTimeSpans.class.$init$((ScaledTimeSpans)this);
        AbstractPatienceConfiguration.class.$init$((AbstractPatienceConfiguration)this);
        PatienceConfiguration.class.$init$((PatienceConfiguration)this);
        Eventually.class.$init$((Eventually)this);
        this.sparkConf = new SparkConf().setMaster("local[4]").setAppName(((Object)((Object)this)).getClass().getSimpleName());
        this.after((Function0<Object>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final Object apply() {
                BoxedUnit boxedUnit;
                if (this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc() != null) {
                    this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().stop(true);
                }
                if (this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir() == null) {
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    Utils$.MODULE$.deleteRecursively(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir());
                    boxedUnit = BoxedUnit.UNIT;
                }
                return boxedUnit;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("basic stream receiving with multiple topics and smallest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Set topics = (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"basic1", "basic2", "basic3"}));
                Map data = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
                topics.foreach((Function1)new Serializable(this, data){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$1 $outer;
                    private final Map data$1;

                    public final void apply(String t) {
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(t);
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(t, this.data$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.data$1 = data$1;
                    }
                });
                int totalSent = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) * topics.size();
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"smallest")}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
                InputDStream stream = (InputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topics, kafkaParams){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$1 $outer;
                    private final Set topics$1;
                    private final Map kafkaParams$1;

                    public final InputDStream<Tuple2<String, String>> apply() {
                        return KafkaUtils$.MODULE$.createDirectStream(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), this.kafkaParams$1, this.topics$1, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topics$1 = topics$1;
                        this.kafkaParams$1 = kafkaParams$1;
                    }
                });
                ConcurrentLinkedQueue<E> allReceived = new ConcurrentLinkedQueue<E>();
                ObjectRef offsetRanges = ObjectRef.create((Object)((OffsetRange[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(OffsetRange.class))));
                stream.transform((Function1)new Serializable(this, offsetRanges){
                    public static final long serialVersionUID = 0L;
                    private final ObjectRef offsetRanges$1;

                    public final RDD<Tuple2<String, String>> apply(RDD<Tuple2<String, String>> rdd) {
                        this.offsetRanges$1.elem = ((HasOffsetRanges)rdd).offsetRanges();
                        return rdd;
                    }
                    {
                        this.offsetRanges$1 = offsetRanges$1;
                    }
                }, ClassTag$.MODULE$.apply(Tuple2.class)).foreachRDD((Function1)new Serializable(this, offsetRanges){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$1 $outer;
                    public final ObjectRef offsetRanges$1;

                    public final void apply(RDD<Tuple2<String, String>> rdd) {
                        Predef$.MODULE$.refArrayOps((Object[])((OffsetRange[])this.offsetRanges$1.elem)).foreach((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$1$$anonfun$apply$mcV$sp$12 $outer;

                            public final void apply(OffsetRange o) {
                                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().logInfo((Function0)new Serializable(this, o){
                                    public static final long serialVersionUID = 0L;
                                    private final OffsetRange o$1;

                                    public final String apply() {
                                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " ", " ", " ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.o$1.topic(), BoxesRunTime.boxToInteger((int)this.o$1.partition()), BoxesRunTime.boxToLong((long)this.o$1.fromOffset()), BoxesRunTime.boxToLong((long)this.o$1.untilOffset())}));
                                    }
                                    {
                                        this.o$1 = o$1;
                                    }
                                });
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                            }
                        });
                        Tuple2[] collected = (Tuple2[])rdd.mapPartitionsWithIndex((Function2)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$1$$anonfun$apply$mcV$sp$12 $outer;

                            public final Iterator<Tuple2<Object, Object>> apply(int i, Iterator<Tuple2<String, String>> iter) {
                                OffsetRange off = ((OffsetRange[])this.$outer.offsetRanges$1.elem)[i];
                                Seq all = iter.toSeq();
                                int partSize = all.size();
                                long rangeSize = off.untilOffset() - off.fromOffset();
                                return scala.package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2.mcIJ.sp(partSize, rangeSize)}));
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                            }
                        }, rdd.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect();
                        Predef$.MODULE$.refArrayOps((Object[])collected).foreach((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$1$$anonfun$apply$mcV$sp$12 $outer;

                            public final void apply(Tuple2<Object, Object> x0$1) {
                                Tuple2<Object, Object> tuple2 = x0$1;
                                if (tuple2 != null) {
                                    int partSize = tuple2._1$mcI$sp();
                                    long rangeSize = tuple2._2$mcJ$sp();
                                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToInteger((int)partSize));
                                    long $org_scalatest_assert_macro_right = rangeSize;
                                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
                                    this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"offset ranges are wrong");
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    return;
                                }
                                throw new MatchError(tuple2);
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                            }
                        });
                    }

                    public /* synthetic */ $anonfun$1 org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.offsetRanges$1 = offsetRanges$1;
                    }
                });
                stream.foreachRDD((Function1)new Serializable(this, allReceived){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue allReceived$1;

                    public final void apply(RDD<Tuple2<String, String>> rdd) {
                        this.allReceived$1.addAll(Arrays.asList((Object[])rdd.collect()));
                    }
                    {
                        this.allReceived$1 = allReceived$1;
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20000)).milliseconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(200)).milliseconds())), new Serializable(this, totalSent, allReceived){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$1 $outer;
                    private final int totalSent$1;
                    private final ConcurrentLinkedQueue allReceived$1;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToInteger((int)this.allReceived$1.size()));
                        int $org_scalatest_assert_macro_right = this.totalSent$1;
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder().append((Object)"didn't get expected number of messages, messages:\n").append((Object)((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)this.allReceived$1).asScala()).mkString("\n")).toString());
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.totalSent$1 = totalSent$1;
                        this.allReceived$1 = allReceived$1;
                    }
                });
                StreamingContext qual$1 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$9 = qual$1.stop$default$1();
                qual$1.stop(x$9);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("receiving from largest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "largest";
                TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
                Map data = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(topic);
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"largest")}));
                KafkaCluster kc = new KafkaCluster(kafkaParams);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, data);
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), new Serializable(this, topicPartition, kc){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$2 $outer;
                    private final TopicAndPartition topicPartition$1;
                    private final KafkaCluster kc$1;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        long $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$1(this.topicPartition$1, this.kc$1);
                        int $org_scalatest_assert_macro_right = 3;
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (long)$org_scalatest_assert_macro_right);
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topicPartition$1 = topicPartition$1;
                        this.kc$1 = kc$1;
                    }
                });
                long offsetBeforeStart = this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$1(topicPartition, kc);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
                InputDStream stream = (InputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topic, kafkaParams){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$2 $outer;
                    private final String topic$1;
                    private final Map kafkaParams$2;

                    public final InputDStream<Tuple2<String, String>> apply() {
                        return KafkaUtils$.MODULE$.createDirectStream(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), this.kafkaParams$2, (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$1})), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topic$1 = topic$1;
                        this.kafkaParams$2 = kafkaParams$2;
                    }
                });
                long $org_scalatest_assert_macro_left = BoxesRunTime.unboxToLong((Object)((DirectKafkaInputDStream)stream).fromOffsets().apply((Object)topicPartition));
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right);
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest");
                ConcurrentLinkedQueue<E> collectedData = new ConcurrentLinkedQueue<E>();
                stream.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(Tuple2<String, String> x$1) {
                        return (String)x$1._2();
                    }
                }, ClassTag$.MODULE$.apply(String.class)).foreachRDD((Function1)new Serializable(this, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue collectedData$1;

                    public final void apply(RDD<String> rdd) {
                        this.collectedData$1.addAll(Arrays.asList((Object[])rdd.collect()));
                    }
                    {
                        this.collectedData$1 = collectedData$1;
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                Map newData = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, newData);
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), new Serializable(this, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue collectedData$1;

                    public final boolean apply() {
                        return this.apply$mcZ$sp();
                    }

                    public boolean apply$mcZ$sp() {
                        return this.collectedData$1.contains("b");
                    }
                    {
                        this.collectedData$1 = collectedData$1;
                    }
                });
                ConcurrentLinkedQueue<E> $org_scalatest_assert_macro_left2 = collectedData;
                String $org_scalatest_assert_macro_right2 = "a";
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2)));
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"");
                StreamingContext qual$2 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$10 = qual$2.stop$default$1();
                qual$2.stop(x$10);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }

            public final long org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$1(TopicAndPartition topicPartition$1, KafkaCluster kc$1) {
                return ((KafkaCluster.LeaderOffset)((MapLike)kc$1.getLatestLeaderOffsets((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicAndPartition[]{topicPartition$1}))).right().get()).apply((Object)topicPartition$1)).offset();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("creating stream by offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "offset";
                TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
                Map data = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(topic);
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"largest")}));
                KafkaCluster kc = new KafkaCluster(kafkaParams);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, data);
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), new Serializable(this, topicPartition, kc){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$3 $outer;
                    private final TopicAndPartition topicPartition$2;
                    private final KafkaCluster kc$2;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        long $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$2(this.topicPartition$2, this.kc$2);
                        int $org_scalatest_assert_macro_right = 10;
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right);
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topicPartition$2 = topicPartition$2;
                        this.kc$2 = kc$2;
                    }
                });
                long offsetBeforeStart = this.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$2(topicPartition, kc);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
                InputDStream stream = (InputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topicPartition, kafkaParams){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$3 $outer;
                    private final TopicAndPartition topicPartition$2;
                    private final Map kafkaParams$3;

                    public final InputDStream<String> apply() {
                        return KafkaUtils$.MODULE$.createDirectStream(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), this.kafkaParams$3, (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topicPartition$2), (Object)BoxesRunTime.boxToLong((long)11L))})), (Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final String apply(MessageAndMetadata<String, String> m) {
                                return (String)m.message();
                            }
                        }, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(String.class));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topicPartition$2 = topicPartition$2;
                        this.kafkaParams$3 = kafkaParams$3;
                    }
                });
                long $org_scalatest_assert_macro_left = BoxesRunTime.unboxToLong((Object)((DirectKafkaInputDStream)stream).fromOffsets().apply((Object)topicPartition));
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right);
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest");
                ConcurrentLinkedQueue<E> collectedData = new ConcurrentLinkedQueue<E>();
                stream.foreachRDD((Function1)new Serializable(this, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue collectedData$2;

                    public final void apply(RDD<String> rdd) {
                        this.collectedData$2.addAll(Arrays.asList((Object[])rdd.collect()));
                    }
                    {
                        this.collectedData$2 = collectedData$2;
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                Map newData = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, newData);
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), new Serializable(this, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue collectedData$2;

                    public final boolean apply() {
                        return this.apply$mcZ$sp();
                    }

                    public boolean apply$mcZ$sp() {
                        return this.collectedData$2.contains("b");
                    }
                    {
                        this.collectedData$2 = collectedData$2;
                    }
                });
                ConcurrentLinkedQueue<E> $org_scalatest_assert_macro_left2 = collectedData;
                String $org_scalatest_assert_macro_right2 = "a";
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left2, "contains", (Object)$org_scalatest_assert_macro_right2, $org_scalatest_assert_macro_left2.contains($org_scalatest_assert_macro_right2)));
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"");
                StreamingContext qual$3 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$11 = qual$3.stop$default$1();
                qual$3.stop(x$11);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }

            public final long org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$getLatestOffset$2(TopicAndPartition topicPartition$2, KafkaCluster kc$2) {
                return ((KafkaCluster.LeaderOffset)((MapLike)kc$2.getLatestLeaderOffsets((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicAndPartition[]{topicPartition$2}))).right().get()).apply((Object)topicPartition$2)).offset();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("offset recovery", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "recovery";
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(topic);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir_$eq(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()));
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"smallest")}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.sparkConf(), Milliseconds$.MODULE$.apply(100L)));
                InputDStream kafkaStream = (InputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topic, kafkaParams){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;
                    private final String topic$2;
                    private final Map kafkaParams$4;

                    public final InputDStream<Tuple2<String, String>> apply() {
                        return KafkaUtils$.MODULE$.createDirectStream(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), this.kafkaParams$4, (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$2})), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topic$2 = topic$2;
                        this.kafkaParams$4 = kafkaParams$4;
                    }
                });
                DStream keyedStream = kafkaStream.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<String, Object> apply(Tuple2<String, String> v) {
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"key"), (Object)BoxesRunTime.boxToInteger((int)new StringOps(Predef$.MODULE$.augmentString((String)v._2())).toInt()));
                    }
                }, ClassTag$.MODULE$.apply(Tuple2.class));
                DStream stateStream = DStream$.MODULE$.toPairDStreamFunctions(keyedStream, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), (Ordering)Ordering.String$.MODULE$).updateStateByKey((Function2)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Some<Object> apply(Seq<Object> values, Option<Object> state) {
                        return new Some((Object)BoxesRunTime.boxToInteger((int)(BoxesRunTime.unboxToInt((Object)values.sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) + BoxesRunTime.unboxToInt((Object)state.getOrElse((Function0)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final int apply() {
                                return this.apply$mcI$sp();
                            }

                            public int apply$mcI$sp() {
                                return 0;
                            }
                        })))));
                    }
                }, ClassTag$.MODULE$.Int());
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().checkpoint(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir().getAbsolutePath());
                stateStream.foreachRDD((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final void apply(RDD<Tuple2<String, Object>> rdd) {
                        Predef$.MODULE$.refArrayOps((Object[])rdd.collect()).headOption().foreach((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final void apply(Tuple2<String, Object> x) {
                                DirectKafkaStreamSuite$.MODULE$.total().set(x._2$mcI$sp());
                            }
                        });
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).grouped(4).foreach((Function1)new Serializable(this, topic){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;
                    private final String topic$2;

                    public final void apply(IndexedSeq<Object> i) {
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$sendData$1((Seq)i, this.topic$2);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topic$2 = topic$2;
                    }
                });
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                        int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
                StreamingContext qual$4 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$12 = qual$4.stop$default$1();
                qual$4.stop(x$12);
                Seq<Tuple2<Time, OffsetRange[]>> offsetRangesAfterStop = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getOffsetRanges(kafkaStream);
                int $org_scalatest_assert_macro_left = offsetRangesAfterStop.size();
                int $org_scalatest_assert_macro_right = 1;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right);
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"No offset ranges generated");
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(Predef$.MODULE$.refArrayOps((Object[])((Tuple2)offsetRangesAfterStop.head())._2()).forall((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;

                    public final boolean apply(OffsetRange x$4) {
                        return this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)x$4.fromOffset())).$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)0), Equality$.MODULE$.default());
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }), "scala.this.Predef.refArrayOps[org.apache.spark.streaming.kafka.OffsetRange](offsetRangesAfterStop.head._2).forall(((x$4: org.apache.spark.streaming.kafka.OffsetRange) => DirectKafkaStreamSuite.this.convertToEqualizer[Long](x$4.fromOffset).===(0)(scalactic.this.Equality.default[Long])))");
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"starting offset not zero");
                this.$outer.logInfo((Function0)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "====== RESTARTING ========";
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$testDir().getAbsolutePath()));
                DStream recoveredStream = (DStream)Predef$.MODULE$.refArrayOps((Object[])this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().graph().getInputStreams()).head();
                Seq recoveredOffsetRanges = (Seq)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getOffsetRanges(recoveredStream).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<Time, Set<OffsetRange>> apply(Tuple2<Time, OffsetRange[]> x) {
                        return new Tuple2(x._1(), (Object)Predef$.MODULE$.refArrayOps((Object[])x._2()).toSet());
                    }
                }, Seq$.MODULE$.canBuildFrom());
                int $org_scalatest_assert_macro_left2 = recoveredOffsetRanges.size();
                int $org_scalatest_assert_macro_right2 = 0;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 > $org_scalatest_assert_macro_right2);
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"No offset ranges recovered");
                Seq earlierOffsetRanges = (Seq)offsetRangesAfterStop.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<Time, Set<OffsetRange>> apply(Tuple2<Time, OffsetRange[]> x) {
                        return new Tuple2(x._1(), (Object)Predef$.MODULE$.refArrayOps((Object[])x._2()).toSet());
                    }
                }, Seq$.MODULE$.canBuildFrom());
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.simpleMacroBool(recoveredOffsetRanges.forall((Function1)new Serializable(this, earlierOffsetRanges){
                    public static final long serialVersionUID = 0L;
                    private final Seq earlierOffsetRanges$1;

                    public final boolean apply(Tuple2<Time, Set<OffsetRange>> or) {
                        return this.earlierOffsetRanges$1.contains((Object)new Tuple2(or._1(), or._2()));
                    }
                    {
                        this.earlierOffsetRanges$1 = earlierOffsetRanges$1;
                    }
                }), "recoveredOffsetRanges.forall(((or: (org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka.OffsetRange])) => earlierOffsetRanges.contains[(org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka.OffsetRange])](scala.Tuple2.apply[org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka.OffsetRange]](or._1, or._2))))");
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)new StringBuilder().append((Object)"Recovered ranges are not the same as the ones generated\n").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"recoveredOffsetRanges: ", "\\n"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{recoveredOffsetRanges}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"earlierOffsetRanges: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{earlierOffsetRanges}))).toString());
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 20).grouped(4).foreach((Function1)new Serializable(this, topic){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;
                    private final String topic$2;

                    public final void apply(IndexedSeq<Object> i) {
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$sendData$1((Seq)i, this.topic$2);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topic$2 = topic$2;
                    }
                });
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$4 $outer;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                        int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 20).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
                StreamingContext qual$5 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$13 = qual$5.stop$default$1();
                qual$5.stop(x$13);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }

            public final void org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$sendData$1(Seq data, String topic$2) {
                Seq strings = (Seq)data.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(int x$2) {
                        return ((Object)BoxesRunTime.boxToInteger((int)x$2)).toString();
                    }
                }, Seq$.MODULE$.canBuildFrom());
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic$2, ((TraversableOnce)strings.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<String, Object> apply(String x$3) {
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)x$3), (Object)BoxesRunTime.boxToInteger((int)1));
                    }
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("Direct Kafka stream report input information", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "report-test";
                Map data = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(topic);
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, data);
                int totalSent = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"smallest")}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(this.$outer.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
                InputInfoCollector collector = new InputInfoCollector();
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().addStreamingListener((StreamingListener)collector);
                InputDStream stream = (InputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topic, kafkaParams){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$5 $outer;
                    private final String topic$3;
                    private final Map kafkaParams$5;

                    public final InputDStream<Tuple2<String, String>> apply() {
                        return KafkaUtils$.MODULE$.createDirectStream(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), this.kafkaParams$5, (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic$3})), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topic$3 = topic$3;
                        this.kafkaParams$5 = kafkaParams$5;
                    }
                });
                ConcurrentLinkedQueue<E> allReceived = new ConcurrentLinkedQueue<E>();
                stream.foreachRDD((Function1)new Serializable(this, allReceived){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue allReceived$2;

                    public final void apply(RDD<Tuple2<String, String>> rdd) {
                        this.allReceived$2.addAll(Arrays.asList((Object[])rdd.collect()));
                    }
                    {
                        this.allReceived$2 = allReceived$2;
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                this.$outer.eventually(this.$outer.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20000)).milliseconds())), this.$outer.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(200)).milliseconds())), new Serializable(this, totalSent, collector, allReceived){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$5 $outer;
                    private final int totalSent$2;
                    private final InputInfoCollector collector$1;
                    private final ConcurrentLinkedQueue allReceived$2;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToInteger((int)this.allReceived$2.size()));
                        int $org_scalatest_assert_macro_right = this.totalSent$2;
                        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder().append((Object)"didn't get expected number of messages, messages:\n").append((Object)((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)this.allReceived$2).asScala()).mkString("\n")).toString());
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)this.collector$1.numRecordsSubmitted().get()));
                        int $org_scalatest_assert_macro_right2 = this.totalSent$2;
                        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"");
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)this.collector$1.numRecordsStarted().get()));
                        int $org_scalatest_assert_macro_right3 = this.totalSent$2;
                        Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"");
                        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().convertToEqualizer(BoxesRunTime.boxToLong((long)this.collector$1.numRecordsCompleted().get()));
                        int $org_scalatest_assert_macro_right4 = this.totalSent$2;
                        Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Equality$.MODULE$.default()));
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"");
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.totalSent$2 = totalSent$2;
                        this.collector$1 = collector$1;
                        this.allReceived$2 = allReceived$2;
                    }
                });
                StreamingContext qual$6 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$14 = qual$6.stop$default$1();
                qual$6.stop(x$14);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("maxMessagesPerPartition with backpressure disabled", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "maxMessagesPerPartition";
                DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>> kafkaStream = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getDirectKafkaStream(topic, (Option<RateController>)None$.MODULE$);
                Map input = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)50L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)50L))}));
                Map $org_scalatest_assert_macro_left = (Map)kafkaStream.maxMessagesPerPartition(input).get();
                Map $org_scalatest_assert_macro_right = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)10L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
                Map map = $org_scalatest_assert_macro_left;
                Map map2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null));
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("maxMessagesPerPartition with no lag", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "maxMessagesPerPartition";
                Some rateController = new Some((Object)((Object)new ConstantRateController(0, new ConstantEstimator(100L), 100L)));
                DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>> kafkaStream = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getDirectKafkaStream(topic, (Option<RateController>)rateController);
                Map input = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)0L))}));
                Option $org_scalatest_assert_macro_left = kafkaStream.maxMessagesPerPartition(input);
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty());
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("maxMessagesPerPartition respects max rate", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "maxMessagesPerPartition";
                Some rateController = new Some((Object)((Object)new ConstantRateController(0, new ConstantEstimator(100L), 1000L)));
                DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>> kafkaStream = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$getDirectKafkaStream(topic, (Option<RateController>)rateController);
                Map input = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)1000L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)1000L))}));
                Map $org_scalatest_assert_macro_left = (Map)kafkaStream.maxMessagesPerPartition(input).get();
                Map $org_scalatest_assert_macro_right = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)10L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicAndPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
                Map map = $org_scalatest_assert_macro_left;
                Map map2 = $org_scalatest_assert_macro_right;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null));
                this.$outer.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.test("using rate controller", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ DirectKafkaStreamSuite $outer;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                String topic = "backpressure";
                Set topicPartitions = (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicAndPartition[]{new TopicAndPartition(topic, 0), new TopicAndPartition(topic, 1)}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().createTopic(topic, 2);
                Map kafkaParams = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"metadata.broker.list"), (Object)this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().brokerAddress()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"smallest")}));
                int batchIntervalMilliseconds = 100;
                ConstantEstimator estimator = new ConstantEstimator(100L);
                Map messages = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"foo"), (Object)BoxesRunTime.boxToInteger((int)200))}));
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$kafkaTestUtils().sendMessages(topic, messages);
                SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(((Object)((Object)this.$outer)).getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
                DirectKafkaInputDStream kafkaStream = (DirectKafkaInputDStream)this.$outer.withClue("Error creating direct stream", (Function0)new Serializable(this, topicPartitions, kafkaParams, estimator){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$9 $outer;
                    private final Set topicPartitions$1;
                    public final Map kafkaParams$6;
                    public final ConstantEstimator estimator$1;

                    public final $anonfun$9$$anonfun$24$$anon$2 apply() {
                        KafkaCluster kc = new KafkaCluster(this.kafkaParams$6);
                        Serializable messageHandler = new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final Tuple2<String, String> apply(MessageAndMetadata<String, String> mmd) {
                                return new Tuple2(mmd.key(), mmd.message());
                            }
                        };
                        Map m = (Map)kc.getEarliestLeaderOffsets(this.topicPartitions$1).fold((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final Map<TopicAndPartition, Object> apply(ArrayBuffer<Throwable> e) {
                                return Predef$.MODULE$.Map().empty();
                            }
                        }, (Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final Map<TopicAndPartition, Object> apply(Map<TopicAndPartition, KafkaCluster.LeaderOffset> m) {
                                return m.mapValues((Function1)new Serializable(this){
                                    public static final long serialVersionUID = 0L;

                                    public final long apply(KafkaCluster.LeaderOffset lo) {
                                        return lo.offset();
                                    }
                                });
                            }
                        });
                        return new DirectKafkaInputDStream<String, String, StringDecoder, StringDecoder, Tuple2<String, String>>(this, (Function1)messageHandler, m){
                            private final Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController;

                            public Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController() {
                                return this.rateController;
                            }
                            {
                                super($outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc(), $outer.kafkaParams$6, m$1, messageHandler$2, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(Tuple2.class));
                                this.rateController = new Some((Object)new DirectKafkaInputDStream.DirectKafkaRateController((DirectKafkaInputDStream)this, this.id(), (RateEstimator)$outer.estimator$1));
                            }
                        };
                    }

                    public /* synthetic */ $anonfun$9 org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.topicPartitions$1 = topicPartitions$1;
                        this.kafkaParams$6 = kafkaParams$6;
                        this.estimator$1 = estimator$1;
                    }
                });
                ConcurrentLinkedQueue<E> collectedData = new ConcurrentLinkedQueue<E>();
                kafkaStream.foreachRDD((Function2)new Serializable(this, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final ConcurrentLinkedQueue collectedData$3;

                    public final void apply(RDD<Tuple2<String, String>> rdd, Time time) {
                        String[] data = (String[])rdd.map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final String apply(Tuple2<String, String> x$6) {
                                return (String)x$6._2();
                            }
                        }, ClassTag$.MODULE$.apply(String.class)).collect();
                        this.collectedData$3.add(data);
                    }
                    {
                        this.collectedData$3 = collectedData$3;
                    }
                });
                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc().start();
                ((IterableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 50, 20}))).foreach((Function1)new Serializable(this, batchIntervalMilliseconds, estimator, collectedData){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$9 $outer;
                    private final int batchIntervalMilliseconds$1;
                    private final ConstantEstimator estimator$1;
                    public final ConcurrentLinkedQueue collectedData$3;

                    public final void apply(int rate) {
                        this.apply$mcVI$sp(rate);
                    }

                    public void apply$mcVI$sp(int rate) {
                        this.collectedData$3.clear();
                        this.estimator$1.updateRate(rate);
                        long expectedSize = Math.round((double)(rate * this.batchIntervalMilliseconds$1) * 0.001);
                        this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().eventually(this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds())), this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(this.batchIntervalMilliseconds$1)).milliseconds())), new Serializable(this, expectedSize, rate){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$9$$anonfun$apply$mcV$sp$9 $outer;
                            public final long expectedSize$1;
                            private final int rate$1;

                            public final void apply() {
                                this.apply$mcV$sp();
                            }

                            public void apply$mcV$sp() {
                                Iterable $org_scalatest_assert_macro_left = (Iterable)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)this.$outer.collectedData$3).asScala();
                                long $org_scalatest_assert_macro_right = this.expectedSize$1;
                                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.existsMacroBool((Object)$org_scalatest_assert_macro_left, (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.exists((Function1)new Serializable(this){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ $anonfun$9$$anonfun$apply$mcV$sp$9$$anonfun$apply$mcVI$sp$1 $outer;

                                    public final boolean apply(String[] x$7) {
                                        return (long)Predef$.MODULE$.refArrayOps((Object[])x$7).size() == this.$outer.expectedSize$1;
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                    }
                                }));
                                this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer().assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" - No arrays of size ", " for rate ", " found in ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.expectedSize$1), BoxesRunTime.boxToInteger((int)this.rate$1), this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer().org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$dataToString$1(this.$outer.collectedData$3)})));
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.expectedSize$1 = expectedSize$1;
                                this.rate$1 = rate$1;
                            }
                        });
                    }

                    public /* synthetic */ $anonfun$9 org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.batchIntervalMilliseconds$1 = batchIntervalMilliseconds$1;
                        this.estimator$1 = estimator$1;
                        this.collectedData$3 = collectedData$3;
                    }
                });
                StreamingContext qual$7 = this.$outer.org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$ssc();
                boolean x$15 = qual$7.stop$default$1();
                qual$7.stop(x$15);
            }

            public /* synthetic */ DirectKafkaStreamSuite org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$$outer() {
                return this.$outer;
            }

            public final String org$apache$spark$streaming$kafka$DirectKafkaStreamSuite$$anonfun$$dataToString$1(ConcurrentLinkedQueue collectedData$3) {
                return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)collectedData$3).asScala()).map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final String apply(String[] x$5) {
                        return Predef$.MODULE$.refArrayOps((Object[])x$5).mkString("[", ",", "]");
                    }
                }, Iterable$.MODULE$.canBuildFrom())).mkString("{", ", ", "}");
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public static class InputInfoCollector
    implements StreamingListener {
        private final AtomicLong numRecordsSubmitted;
        private final AtomicLong numRecordsStarted;
        private final AtomicLong numRecordsCompleted;

        public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
            StreamingListener.class.onStreamingStarted((StreamingListener)this, (StreamingListenerStreamingStarted)streamingStarted);
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
            StreamingListener.class.onReceiverStarted((StreamingListener)this, (StreamingListenerReceiverStarted)receiverStarted);
        }

        public void onReceiverError(StreamingListenerReceiverError receiverError) {
            StreamingListener.class.onReceiverError((StreamingListener)this, (StreamingListenerReceiverError)receiverError);
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
            StreamingListener.class.onReceiverStopped((StreamingListener)this, (StreamingListenerReceiverStopped)receiverStopped);
        }

        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {
            StreamingListener.class.onOutputOperationStarted((StreamingListener)this, (StreamingListenerOutputOperationStarted)outputOperationStarted);
        }

        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {
            StreamingListener.class.onOutputOperationCompleted((StreamingListener)this, (StreamingListenerOutputOperationCompleted)outputOperationCompleted);
        }

        public AtomicLong numRecordsSubmitted() {
            return this.numRecordsSubmitted;
        }

        public AtomicLong numRecordsStarted() {
            return this.numRecordsStarted;
        }

        public AtomicLong numRecordsCompleted() {
            return this.numRecordsCompleted;
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
            this.numRecordsSubmitted().addAndGet(batchSubmitted.batchInfo().numRecords());
        }

        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
            this.numRecordsStarted().addAndGet(batchStarted.batchInfo().numRecords());
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
            this.numRecordsCompleted().addAndGet(batchCompleted.batchInfo().numRecords());
        }

        public InputInfoCollector() {
            StreamingListener.class.$init$((StreamingListener)this);
            this.numRecordsSubmitted = new AtomicLong(0L);
            this.numRecordsStarted = new AtomicLong(0L);
            this.numRecordsCompleted = new AtomicLong(0L);
        }
    }
}

