package org.apache.spark.sql.kafka010;

import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.streaming.SourceProgress;
import org.apache.spark.sql.streaming.StreamTest;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import scala.Array$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KafkaMicroBatchSourceSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001Y1AAA\u0002\u0001\u001d!)1\u0003\u0001C\u0001)\ta2*\u00194lC6K7M]8CCR\u001c\u0007N\u0016\u001aT_V\u00148-Z*vSR,'B\u0001\u0003\u0006\u0003!Y\u0017MZ6baE\u0002$B\u0001\u0004\b\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0011%\tQa\u001d9be.T!AC\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005a\u0011aA8sO\u000e\u00011C\u0001\u0001\u0010!\t\u0001\u0012#D\u0001\u0004\u0013\t\u00112A\u0001\u0010LC\u001a\\\u0017-T5de>\u0014\u0015\r^2i'>,(oY3Tk&$XMQ1tK\u00061A(\u001b8jiz\"\u0012!\u0006\t\u0003!\u0001\u0001")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaMicroBatchV2SourceSuite.class */
public class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
    public static final /* synthetic */ boolean $anonfun$new$130(LogicalPlan logicalPlan) {
        return logicalPlan instanceof StreamingDataSourceV2Relation ? ((StreamingDataSourceV2Relation) logicalPlan).stream() instanceof KafkaMicroBatchStream : false;
    }

    public static final /* synthetic */ boolean $anonfun$new$129(StreamExecution streamExecution) {
        return streamExecution.logicalPlan().find(logicalPlan -> {
            return BoxesRunTime.boxToBoolean($anonfun$new$130(logicalPlan));
        }).isDefined();
    }

    public static final /* synthetic */ void $anonfun$new$132(KafkaMicroBatchV2SourceSuite kafkaMicroBatchV2SourceSuite, String str, String str2, TopicPartition topicPartition, int i, File file) {
        KafkaSourceProvider kafkaSourceProvider = new KafkaSourceProvider();
        CaseInsensitiveStringMap caseInsensitiveStringMap = new CaseInsensitiveStringMap((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.bootstrap.servers"), kafkaMicroBatchV2SourceSuite.testUtils().brokerAddress()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("subscribe"), str)})).$plus$plus(Option$.MODULE$.option2Iterable(Option$.MODULE$.apply(str2).map(str3 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("minPartitions"), str3);
        })))).asJava());
        KafkaBatchInputPartition[] kafkaBatchInputPartitionArr = (KafkaBatchInputPartition[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(kafkaSourceProvider.getTable(caseInsensitiveStringMap).newScanBuilder(caseInsensitiveStringMap).build().toMicroBatchStream(file.getAbsolutePath()).planInputPartitions(new KafkaSourceOffset(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(0L))}))), new KafkaSourceOffset(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(100L))})))))).map(inputPartition -> {
            return (KafkaBatchInputPartition) inputPartition;
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(KafkaBatchInputPartition.class)));
        kafkaMicroBatchV2SourceSuite.withClue(new StringBuilder(39).append("minPartitions = ").append(str2).append(" generated factories ").append(kafkaBatchInputPartitionArr).append("\n\t").toString(), () -> {
            Object[] refArrayOps = Predef$.MODULE$.refArrayOps(kafkaBatchInputPartitionArr);
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(new ArrayOps.ofRef(refArrayOps), "size", BoxesRunTime.boxToInteger(new ArrayOps.ofRef(refArrayOps).size()), BoxesRunTime.boxToInteger(i), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1383));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void test$1(String str, int i, boolean z, String str2, TopicPartition topicPartition) {
        SparkSession$.MODULE$.setActiveSession(spark());
        withTempDir(file -> {
            $anonfun$new$132(this, str2, str, topicPartition, i, file);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ ProducerRecord $anonfun$new$146(String str, Seq seq, int i) {
        return new RecordBuilder(str, Integer.toString(i)).partition(i - 31).headers(seq).build();
    }

    public static final /* synthetic */ int $anonfun$new$148(Tuple2 tuple2) {
        return new StringOps(Predef$.MODULE$.augmentString((String) tuple2._2())).toInt() + 1;
    }

    public static final /* synthetic */ int $anonfun$new$152(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).toInt();
    }

    public static final /* synthetic */ boolean $anonfun$new$155(SourceProgress sourceProgress) {
        return !sourceProgress.metrics().isEmpty() && new StringOps(Predef$.MODULE$.augmentString((String) sourceProgress.metrics().get("avgOffsetsBehindLatest"))).toDouble() > ((double) 0);
    }

    public static final /* synthetic */ int $anonfun$new$159(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).toInt();
    }

    public KafkaMicroBatchV2SourceSuite() {
        test("V2 Source is used by default", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String newTopic = this.newTopic();
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(newTopic, 5, testUtils.createTopic$default$3());
            this.testStream(this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("kafka.metadata.max.age.ms", "1").option("subscribePattern", new StringBuilder(2).append(newTopic).append(".*").toString()).load(), this.testStream$default$2(), Predef$.MODULE$.wrapRefArray(new StreamTest.StreamAction[]{this.makeSureGetOffsetCalled(), this.AssertOnQuery().apply(streamExecution -> {
                return BoxesRunTime.boxToBoolean($anonfun$new$129(streamExecution));
            }, this.AssertOnQuery().apply$default$2())}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1335));
        testWithUninterruptibleThread("minPartitions is supported", testWithUninterruptibleThread$default$2(), () -> {
            String newTopic = this.newTopic();
            TopicPartition topicPartition = new TopicPartition(newTopic, 0);
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(newTopic, 1, testUtils.createTopic$default$3());
            this.test$1(null, 1, true, newTopic, topicPartition);
            this.test$1("1", 1, true, newTopic, topicPartition);
            this.test$1("4", 4, false, newTopic, topicPartition);
            this.intercept(() -> {
                this.test$1("a", 1, true, newTopic, topicPartition);
            }, ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1394));
            this.intercept(() -> {
                this.test$1("1.0", 1, true, newTopic, topicPartition);
            }, ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1395));
            this.intercept(() -> {
                this.test$1("0", 1, true, newTopic, topicPartition);
            }, ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1396));
            this.intercept(() -> {
                this.test$1("-1", 1, true, newTopic, topicPartition);
            }, ClassTag$.MODULE$.apply(IllegalArgumentException.class), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1397));
        });
        test("default config of includeHeader doesn't break existing query from Spark 2.4", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String str = "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62";
            this.testUtils().createTopic("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", 5, true);
            this.testUtils().sendMessages("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", (String[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{-20, -21, -22})).map(obj -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj));
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), new Some(BoxesRunTime.boxToInteger(0)));
            this.testUtils().sendMessages("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", (String[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{-10, -11, -12})).map(obj2 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj2));
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), new Some(BoxesRunTime.boxToInteger(1)));
            this.testUtils().sendMessages("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", (String[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{0, 1, 2})).map(obj3 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj3));
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), new Some(BoxesRunTime.boxToInteger(2)));
            this.testUtils().sendMessages("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", (String[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{10, 11, 12})).map(obj4 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj4));
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), new Some(BoxesRunTime.boxToInteger(3)));
            this.testUtils().sendMessages("spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62", (String[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(new int[]{20, 21, 22})).map(obj5 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj5));
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), new Some(BoxesRunTime.boxToInteger(4)));
            Predef$.MODULE$.require(this.convertToEqualizer(BoxesRunTime.boxToInteger(this.testUtils().getLatestOffsets((Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62"}))).size())).$eq$eq$eq(BoxesRunTime.boxToInteger(5), Equality$.MODULE$.default()));
            Seq colonVar = new $colon.colon(new Tuple2("a", "b".getBytes(StandardCharsets.UTF_8)), new $colon.colon(new Tuple2("c", "d".getBytes(StandardCharsets.UTF_8)), Nil$.MODULE$));
            ((IterableLike) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(31), 35).map(obj6 -> {
                return $anonfun$new$146(str, colonVar, BoxesRunTime.unboxToInt(obj6));
            }, IndexedSeq$.MODULE$.canBuildFrom())).foreach(producerRecord -> {
                return this.testUtils().sendMessage(producerRecord);
            });
            final KafkaMicroBatchV2SourceSuite kafkaMicroBatchV2SourceSuite = null;
            Dataset<?> map = this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("kafka.metadata.max.age.ms", "1").option("subscribePattern", "spark-test-topic-2b8619f5-d3c4-4c2d-b5d1-8d9d9458aa62").option("startingOffsets", "earliest").load().dropDuplicates().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).as(this.testImplicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaMicroBatchV2SourceSuite.class.getClassLoader()), new TypeCreator(kafkaMicroBatchV2SourceSuite) { // from class: org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite$$typecreator5$2
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), Nil$.MODULE$)));
                }
            }))).map(tuple2 -> {
                return BoxesRunTime.boxToInteger($anonfun$new$148(tuple2));
            }, this.testImplicits().newIntEncoder());
            URI uri = this.getClass().getResource("/structured-streaming/checkpoint-version-2.4.3-kafka-include-headers-default/").toURI();
            File canonicalFile = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()).getCanonicalFile();
            FileUtils.copyDirectory(new File(uri), canonicalFile);
            this.testStream(map, this.testStream$default$2(), Predef$.MODULE$.wrapRefArray(new StreamTest.StreamAction[]{new StreamTest.StartStream(this, this.StartStream().apply$default$1(), this.StartStream().apply$default$2(), this.StartStream().apply$default$3(), canonicalFile.getAbsolutePath()), this.makeSureGetOffsetCalled(), this.CheckNewAnswer().apply(BoxesRunTime.boxToInteger(32), Predef$.MODULE$.wrapIntArray(new int[]{33, 34, 35, 36}), this.testImplicits().newIntEncoder())}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1400));
        test("test custom metrics - with rate limit", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String newTopic = this.newTopic();
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(newTopic, 2, testUtils.createTopic$default$3());
            this.testUtils().sendMessages(newTopic, (String[]) ((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), new Some(BoxesRunTime.boxToInteger(0)));
            this.testUtils().sendMessages(newTopic, (String[]) ((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).map(obj2 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj2));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), new Some(BoxesRunTime.boxToInteger(1)));
            this.testStream(this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("subscribe", newTopic).option("maxOffsetsPerTrigger", 1L).option(KafkaSourceProvider$.MODULE$.STARTING_OFFSETS_OPTION_KEY(), "earliest").load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(value AS STRING)"})).as(this.testImplicits().newStringEncoder()).map(str -> {
                return BoxesRunTime.boxToInteger($anonfun$new$152(str));
            }, this.testImplicits().newIntEncoder()), this.testStream$default$2(), Predef$.MODULE$.wrapRefArray(new StreamTest.StreamAction[]{new StreamTest.StartStream(this, this.StartStream().apply$default$1(), this.StartStream().apply$default$2(), this.StartStream().apply$default$3(), this.StartStream().apply$default$4()), this.makeSureGetOffsetCalled(), this.CheckAnswer().apply(inclusive, this.testImplicits().newIntEncoder()), this.Execute().apply(streamExecution -> {
                Option find = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(streamExecution.recentProgress())).map(streamingQueryProgress -> {
                    return (SourceProgress) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(streamingQueryProgress.sources())).head();
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(SourceProgress.class))))).reverse())).find(sourceProgress -> {
                    return BoxesRunTime.boxToBoolean($anonfun$new$155(sourceProgress));
                });
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(find, "nonEmpty", find.nonEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1489));
                Map metrics = ((SourceProgress) find.get()).metrics();
                TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(metrics.keySet());
                java.util.Set set = (java.util.Set) JavaConverters$.MODULE$.setAsJavaSetConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"minOffsetsBehindLatest", "maxOffsetsBehindLatest", "avgOffsetsBehindLatest"}))).asJava();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", set, convertToEqualizer.$eq$eq$eq(set, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1491));
                long j = new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("minOffsetsBehindLatest"))).toLong();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j), ">", BoxesRunTime.boxToInteger(0), j > ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1495));
                long j2 = new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("maxOffsetsBehindLatest"))).toLong();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j2), ">", BoxesRunTime.boxToInteger(0), j2 > ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1496));
                double d = new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("avgOffsetsBehindLatest"))).toDouble();
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToDouble(d), ">", BoxesRunTime.boxToInteger(0), d > ((double) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1497));
            })}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1458));
        test("test custom metrics - no rate limit", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String newTopic = this.newTopic();
            Range.Inclusive inclusive = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10);
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(newTopic, 2, testUtils.createTopic$default$3());
            this.testUtils().sendMessages(newTopic, (String[]) ((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).map(obj -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), new Some(BoxesRunTime.boxToInteger(0)));
            this.testUtils().sendMessages(newTopic, (String[]) ((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).map(obj2 -> {
                return Integer.toString(BoxesRunTime.unboxToInt(obj2));
            }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), new Some(BoxesRunTime.boxToInteger(1)));
            this.testStream(this.spark().readStream().format("kafka").option("kafka.bootstrap.servers", this.testUtils().brokerAddress()).option("subscribe", newTopic).option(KafkaSourceProvider$.MODULE$.STARTING_OFFSETS_OPTION_KEY(), "earliest").load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(value AS STRING)"})).as(this.testImplicits().newStringEncoder()).map(str -> {
                return BoxesRunTime.boxToInteger($anonfun$new$159(str));
            }, this.testImplicits().newIntEncoder()), this.testStream$default$2(), Predef$.MODULE$.wrapRefArray(new StreamTest.StreamAction[]{new StreamTest.StartStream(this, this.StartStream().apply$default$1(), this.StartStream().apply$default$2(), this.StartStream().apply$default$3(), this.StartStream().apply$default$4()), this.makeSureGetOffsetCalled(), this.CheckAnswer().apply(inclusive, this.testImplicits().newIntEncoder()), this.Execute().apply(streamExecution -> {
                Option lastOption = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(streamExecution.recentProgress())).map(streamingQueryProgress -> {
                    return (SourceProgress) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(streamingQueryProgress.sources())).head();
                }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(SourceProgress.class))))).lastOption();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(lastOption, "nonEmpty", lastOption.nonEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1528));
                Map metrics = ((SourceProgress) lastOption.get()).metrics();
                TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(metrics.keySet());
                java.util.Set set = (java.util.Set) JavaConverters$.MODULE$.setAsJavaSetConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"minOffsetsBehindLatest", "maxOffsetsBehindLatest", "avgOffsetsBehindLatest"}))).asJava();
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", set, convertToEqualizer.$eq$eq$eq(set, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1531));
                TripleEqualsSupport.Equalizer convertToEqualizer2 = this.convertToEqualizer(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("minOffsetsBehindLatest"))).toLong()));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(0), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(0), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1535));
                TripleEqualsSupport.Equalizer convertToEqualizer3 = this.convertToEqualizer(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("maxOffsetsBehindLatest"))).toLong()));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer3, "===", BoxesRunTime.boxToInteger(0), convertToEqualizer3.$eq$eq$eq(BoxesRunTime.boxToInteger(0), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1536));
                TripleEqualsSupport.Equalizer convertToEqualizer4 = this.convertToEqualizer(BoxesRunTime.boxToDouble(new StringOps(Predef$.MODULE$.augmentString((String) metrics.get("avgOffsetsBehindLatest"))).toDouble()));
                return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer4, "===", BoxesRunTime.boxToInteger(0), convertToEqualizer4.$eq$eq$eq(BoxesRunTime.boxToInteger(0), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1537));
            })}));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1502));
        test("test custom metrics - corner cases", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            TopicPartition topicPartition = new TopicPartition(this.newTopic(), 0);
            TopicPartition topicPartition2 = new TopicPartition(this.newTopic(), 0);
            scala.collection.immutable.Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(topicPartition, BoxesRunTime.boxToLong(3L)), new Tuple2(topicPartition2, BoxesRunTime.boxToLong(6L))}));
            Map metrics = KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(null), apply);
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(metrics, "isEmpty", metrics.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1548));
            KafkaSourceOffset kafkaSourceOffset = new KafkaSourceOffset(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(topicPartition, BoxesRunTime.boxToLong(1L)), new Tuple2(topicPartition2, BoxesRunTime.boxToLong(2L))})));
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(kafkaSourceOffset), apply));
            Map map = (Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("minOffsetsBehindLatest"), "2"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("maxOffsetsBehindLatest"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("avgOffsetsBehindLatest"), "3.0")}))).asJava();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", map, convertToEqualizer.$eq$eq$eq(map, Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1553));
            Map metrics2 = KafkaMicroBatchStream$.MODULE$.metrics(Optional.ofNullable(kafkaSourceOffset), (scala.collection.immutable.Map) null);
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(metrics2, "isEmpty", metrics2.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1561));
        }, new Position("KafkaMicroBatchSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 1542));
    }
}
