package org.apache.spark.sql.kafka010;

import java.io.File;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.kafka010.KafkaContinuousTest;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.test.SharedSparkSession;
import org.apache.spark.sql.test.TestSparkSession;
import org.apache.spark.util.Utils$;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import org.scalatest.time.Span;
import org.scalatest.time.SpanSugar$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

/* compiled from: KafkaContinuousSinkSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-a\u0001B\u0001\u0003\u00015\u0011\u0001dS1gW\u0006\u001cuN\u001c;j]V|Wo]*j].\u001cV/\u001b;f\u0015\t\u0019A!\u0001\u0005lC\u001a\\\u0017\rM\u00191\u0015\t)a!A\u0002tc2T!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\r\u0001aB\u0005\t\u0003\u001fAi\u0011AA\u0005\u0003#\t\u0011qbS1gW\u0006\u001cv.\u001e:dKR+7\u000f\u001e\t\u0003\u001fMI!\u0001\u0006\u0002\u0003'-\u000bgm[1D_:$\u0018N\\;pkN$Vm\u001d;\t\u000bY\u0001A\u0011A\f\u0002\rqJg.\u001b;?)\u0005A\u0002CA\b\u0001\u0011\u001dQ\u0002A1A\u0005Bm\t\u0001c\u001d;sK\u0006l\u0017N\\4US6,w.\u001e;\u0016\u0003q\u0001\"!\b\u0012\u000e\u0003yQ!a\b\u0011\u0002\tQLW.\u001a\u0006\u0003C)\t\u0011b]2bY\u0006$Xm\u001d;\n\u0005\rr\"\u0001B*qC:Da!\n\u0001!\u0002\u0013a\u0012!E:ue\u0016\fW.\u001b8h)&lWm\\;uA!)q\u0005\u0001C!Q\u0005I!-\u001a4pe\u0016\fE\u000e\u001c\u000b\u0002SA\u0011!&L\u0007\u0002W)\tA&A\u0003tG\u0006d\u0017-\u0003\u0002/W\t!QK\\5u\u0011\u0015\u0001\u0004\u0001\"\u0011)\u0003!\tg\r^3s\u00032d\u0007\"\u0002\u001a\u0001\t\u0013\u0019\u0014!E2sK\u0006$XmS1gW\u0006\u0014V-\u00193feR\u0011AG\u0012\t\u0003k\rs!AN!\u000f\u0005]\u0002eB\u0001\u001d@\u001d\tIdH\u0004\u0002;{5\t1H\u0003\u0002=\u0019\u00051AH]8pizJ\u0011aC\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005\u00151\u0011B\u0001\"\u0005\u0003\u001d\u0001\u0018mY6bO\u0016L!\u0001R#\u0003\u0013\u0011\u000bG/\u0019$sC6,'B\u0001\"\u0005\u0011\u00159\u0015\u00071\u0001I\u0003\u0015!x\u000e]5d!\tIEJ\u0004\u0002+\u0015&\u00111jK\u0001\u0007!J,G-\u001a4\n\u00055s%AB*ue&twM\u0003\u0002LW!)\u0001\u000b\u0001C\u0005#\u0006\t2M]3bi\u0016\\\u0015MZ6b/JLG/\u001a:\u0015\u000bIs\u0006-Z6\u0015\u0005MK\u0006C\u0001+X\u001b\u0005)&B\u0001,\u0005\u0003%\u0019HO]3b[&tw-\u0003\u0002Y+\nq1\u000b\u001e:fC6LgnZ)vKJL\b\"\u0002.P\u0001\u0004Y\u0016AD<ji\"\u001cV\r\\3di\u0016C\bO\u001d\t\u0004UqC\u0015BA/,\u0005)a$/\u001a9fCR,GM\u0010\u0005\u0006?>\u0003\r\u0001N\u0001\u0006S:\u0004X\u000f\u001e\u0005\bC>\u0003\n\u00111\u0001c\u0003%9\u0018\u000e\u001e5U_BL7\rE\u0002+G\"K!\u0001Z\u0016\u0003\r=\u0003H/[8o\u0011\u001d1w\n%AA\u0002\u001d\fab^5uQ>+H\u000f];u\u001b>$W\rE\u0002+G\"\u0004\"\u0001V5\n\u0005),&AC(viB,H/T8eK\"9An\u0014I\u0001\u0002\u0004i\u0017aC<ji\"|\u0005\u000f^5p]N\u0004B!\u00138I\u0011&\u0011qN\u0014\u0002\u0004\u001b\u0006\u0004\bbB9\u0001#\u0003%IA]\u0001\u001cGJ,\u0017\r^3LC\u001a\\\u0017m\u0016:ji\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001a\u0016\u0003MT#A\u0019;,\u0003U\u0004\"A^>\u000e\u0003]T!\u0001_=\u0002\u0013Ut7\r[3dW\u0016$'B\u0001>,\u0003)\tgN\\8uCRLwN\\\u0005\u0003y^\u0014\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011\u001dq\b!%A\u0005\n}\f1d\u0019:fCR,7*\u00194lC^\u0013\u0018\u000e^3sI\u0011,g-Y;mi\u0012\u001aTCAA\u0001U\t9G\u000fC\u0005\u0002\u0006\u0001\t\n\u0011\"\u0003\u0002\b\u0005Y2M]3bi\u0016\\\u0015MZ6b/JLG/\u001a:%I\u00164\u0017-\u001e7uIQ*\"!!\u0003+\u00055$\b")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.class */
public class KafkaContinuousSinkSuite extends KafkaSourceTest implements KafkaContinuousTest {
    private final Span streamingTimeout;
    private final Trigger defaultTrigger;
    private final boolean defaultUseV2Sink;
    private final SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public Trigger defaultTrigger() {
        return this.defaultTrigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public boolean defaultUseV2Sink() {
        return this.defaultUseV2Sink;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener() {
        return this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$beforeEach() {
        SharedSparkSession.class.beforeEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$afterEach() {
        SharedSparkSession.class.afterEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultTrigger_$eq(Trigger trigger) {
        this.defaultTrigger = trigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultUseV2Sink_$eq(boolean z) {
        this.defaultUseV2Sink = z;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener_$eq(SparkListener sparkListener) {
        this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener = sparkListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public TestSparkSession createSparkSession() {
        return KafkaContinuousTest.Cclass.createSparkSession(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void setTopicPartitions(String str, int i, StreamExecution streamExecution) {
        KafkaContinuousTest.Cclass.setTopicPartitions(this, str, i, streamExecution);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void beforeEach() {
        KafkaContinuousTest.Cclass.beforeEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void afterEach() {
        KafkaContinuousTest.Cclass.afterEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public Span streamingTimeout() {
        return this.streamingTimeout;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public void beforeAll() {
        super.beforeAll();
        testUtils_$eq(new KafkaTestUtils(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.create.topics.enable"), "false")}))));
        testUtils().setup();
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public void afterAll() {
        if (testUtils() != null) {
            testUtils().teardown();
            testUtils_$eq(null);
        }
        super.afterAll();
    }

    public Dataset<Row> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaReader(String str) {
        return spark().read().format("kafka").option("kafka.bootstrap.servers", testUtils().brokerAddress()).option("startingOffsets", "earliest").option("endingOffsets", "latest").option("subscribe", str).load();
    }

    public StreamingQuery org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter(Dataset<Row> dataset, Option<String> option, Option<OutputMode> option2, Map<String, String> map, Seq<String> seq) {
        ObjectRef create = ObjectRef.create((Object) null);
        File createTempDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
        Dataset df = dataset.toDF();
        if (seq.length() > 0) {
            df = df.selectExpr(seq);
        }
        create.elem = df.writeStream().format("kafka").option("checkpointLocation", createTempDir.getCanonicalPath()).option("kafka.bootstrap.servers", testUtils().brokerAddress()).option("kafka.max.block.ms", "1000").trigger(Trigger.Continuous(1000L)).queryName("kafkaStream");
        option.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$1(this, create));
        option2.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$2(this, create));
        map.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$3(this, create));
        return ((DataStreamWriter) create.elem).start();
    }

    public Option<String> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$2() {
        return None$.MODULE$;
    }

    public Option<OutputMode> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$3() {
        return None$.MODULE$;
    }

    public Map<String, String> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$4() {
        return Predef$.MODULE$.Map().apply(Nil$.MODULE$);
    }

    public KafkaContinuousSinkSuite() {
        KafkaContinuousTest.Cclass.$init$(this);
        this.streamingTimeout = SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds();
        test("streaming - write to kafka with topic field", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$1(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 58));
        test("streaming - write w/o topic field, with topic option", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$3(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 99));
        test("streaming - topic field and topic option", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$5(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 139));
        test("null topic attribute", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$7(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 187));
        test("streaming - write data with bad schema", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$8(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 221));
        test("streaming - write data with valid schema but wrong types", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$9(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 271));
        test("streaming - write to non-existing topic", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$10(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 336));
        test("streaming - exception on config serializer", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$11(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 366));
        test("generic - write big data with small producer buffer", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$12(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 408));
    }
}
