package org.apache.spark.sql.kafka010;

import java.io.File;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.kafka010.KafkaContinuousTest;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.test.SharedSparkSession;
import org.apache.spark.sql.test.TestSparkSession;
import org.apache.spark.util.Utils$;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import org.scalatest.time.Span;
import org.scalatest.time.SpanSugar$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

/* compiled from: KafkaContinuousSinkSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055b\u0001B\u0001\u0003\u00015\u0011\u0001dS1gW\u0006\u001cuN\u001c;j]V|Wo]*j].\u001cV/\u001b;f\u0015\t\u0019A!\u0001\u0005lC\u001a\\\u0017\rM\u00191\u0015\t)a!A\u0002tc2T!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\r\u0001aB\u0005\t\u0003\u001fAi\u0011AA\u0005\u0003#\t\u0011qbS1gW\u0006\u001cv.\u001e:dKR+7\u000f\u001e\t\u0003\u001fMI!\u0001\u0006\u0002\u0003'-\u000bgm[1D_:$\u0018N\\;pkN$Vm\u001d;\t\u000bY\u0001A\u0011A\f\u0002\rqJg.\u001b;?)\u0005A\u0002CA\b\u0001\u0011\u001dQ\u0002A1A\u0005Bm\t\u0001c\u001d;sK\u0006l\u0017N\\4US6,w.\u001e;\u0016\u0003q\u0001\"!\b\u0012\u000e\u0003yQ!a\b\u0011\u0002\tQLW.\u001a\u0006\u0003C)\t\u0011b]2bY\u0006$Xm\u001d;\n\u0005\rr\"\u0001B*qC:Da!\n\u0001!\u0002\u0013a\u0012!E:ue\u0016\fW.\u001b8h)&lWm\\;uA!9q\u0005\u0001b\u0001\n\u0003B\u0013a\u00032s_.,'\u000f\u0015:paN,\u0012!\u000b\t\u0005UE\u001a4'D\u0001,\u0015\taS&A\u0005j[6,H/\u00192mK*\u0011afL\u0001\u000bG>dG.Z2uS>t'\"\u0001\u0019\u0002\u000bM\u001c\u0017\r\\1\n\u0005IZ#aA'baB\u0011A'O\u0007\u0002k)\u0011agN\u0001\u0005Y\u0006twMC\u00019\u0003\u0011Q\u0017M^1\n\u0005i*$AB*ue&tw\r\u0003\u0004=\u0001\u0001\u0006I!K\u0001\rEJ|7.\u001a:Qe>\u00048\u000f\t\u0005\u0006}\u0001!\teP\u0001\tC\u001a$XM]!mYR\t\u0001\t\u0005\u0002B\u00056\tq&\u0003\u0002D_\t!QK\\5u\u0011\u0015)\u0005\u0001\"\u0003G\u0003E\u0019'/Z1uK.\u000bgm[1SK\u0006$WM\u001d\u000b\u0003\u000ff\u0003\"\u0001\u0013,\u000f\u0005%#fB\u0001&T\u001d\tY%K\u0004\u0002M#:\u0011Q\nU\u0007\u0002\u001d*\u0011q\nD\u0001\u0007yI|w\u000e\u001e \n\u0003-I!!\u0003\u0006\n\u0005\u001dA\u0011BA\u0003\u0007\u0013\t)F!A\u0004qC\u000e\\\u0017mZ3\n\u0005]C&!\u0003#bi\u00064%/Y7f\u0015\t)F\u0001C\u0003[\t\u0002\u00071,A\u0003u_BL7\r\u0005\u0002]?:\u0011\u0011)X\u0005\u0003=>\na\u0001\u0015:fI\u00164\u0017B\u0001\u001ea\u0015\tqv\u0006C\u0003c\u0001\u0011%1-A\tde\u0016\fG/Z&bM.\fwK]5uKJ$R\u0001\u001a9sov$\"!Z6\u0011\u0005\u0019LW\"A4\u000b\u0005!$\u0011!C:ue\u0016\fW.\u001b8h\u0013\tQwM\u0001\bTiJ,\u0017-\\5oOF+XM]=\t\u000b1\f\u0007\u0019A7\u0002\u001d]LG\u000f[*fY\u0016\u001cG/\u0012=qeB\u0019\u0011I\\.\n\u0005=|#A\u0003\u001fsKB,\u0017\r^3e}!)\u0011/\u0019a\u0001\u000f\u0006)\u0011N\u001c9vi\"91/\u0019I\u0001\u0002\u0004!\u0018!C<ji\"$v\u000e]5d!\r\tUoW\u0005\u0003m>\u0012aa\u00149uS>t\u0007b\u0002=b!\u0003\u0005\r!_\u0001\u000fo&$\bnT;uaV$Xj\u001c3f!\r\tUO\u001f\t\u0003MnL!\u0001`4\u0003\u0015=+H\u000f];u\u001b>$W\rC\u0004\u007fCB\u0005\t\u0019A@\u0002\u0017]LG\u000f[(qi&|gn\u001d\t\u00069\u0006\u00051lW\u0005\u0003e\u0001D\u0011\"!\u0002\u0001#\u0003%I!a\u0002\u00027\r\u0014X-\u0019;f\u0017\u000647.Y,sSR,'\u000f\n3fM\u0006,H\u000e\u001e\u00133+\t\tIAK\u0002u\u0003\u0017Y#!!\u0004\u0011\t\u0005=\u0011\u0011D\u0007\u0003\u0003#QA!a\u0005\u0002\u0016\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003/y\u0013AC1o]>$\u0018\r^5p]&!\u00111DA\t\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\n\u0003?\u0001\u0011\u0013!C\u0005\u0003C\t1d\u0019:fCR,7*\u00194lC^\u0013\u0018\u000e^3sI\u0011,g-Y;mi\u0012\u001aTCAA\u0012U\rI\u00181\u0002\u0005\n\u0003O\u0001\u0011\u0013!C\u0005\u0003S\t1d\u0019:fCR,7*\u00194lC^\u0013\u0018\u000e^3sI\u0011,g-Y;mi\u0012\"TCAA\u0016U\ry\u00181\u0002")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.class */
public class KafkaContinuousSinkSuite extends KafkaSourceTest implements KafkaContinuousTest {
    private final Span streamingTimeout;
    private final Map<String, String> brokerProps;
    private final Trigger defaultTrigger;
    private final boolean defaultUseV2Sink;
    private final SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public Trigger defaultTrigger() {
        return this.defaultTrigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public boolean defaultUseV2Sink() {
        return this.defaultUseV2Sink;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public SparkListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener() {
        return this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$beforeEach() {
        SharedSparkSession.class.beforeEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$afterEach() {
        SharedSparkSession.class.afterEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultTrigger_$eq(Trigger trigger) {
        this.defaultTrigger = trigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultUseV2Sink_$eq(boolean z) {
        this.defaultUseV2Sink = z;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener_$eq(SparkListener sparkListener) {
        this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener = sparkListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public TestSparkSession createSparkSession() {
        return KafkaContinuousTest.Cclass.createSparkSession(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void setTopicPartitions(String str, int i, StreamExecution streamExecution) {
        KafkaContinuousTest.Cclass.setTopicPartitions(this, str, i, streamExecution);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void beforeEach() {
        KafkaContinuousTest.Cclass.beforeEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void afterEach() {
        KafkaContinuousTest.Cclass.afterEach(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public Span streamingTimeout() {
        return this.streamingTimeout;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest
    public Map<String, String> brokerProps() {
        return this.brokerProps;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaTest
    public void afterAll() {
        if (testUtils() != null) {
            testUtils().teardown();
            testUtils_$eq(null);
        }
        super.afterAll();
    }

    public Dataset<Row> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaReader(String str) {
        return spark().read().format("kafka").option("kafka.bootstrap.servers", testUtils().brokerAddress()).option("startingOffsets", "earliest").option("endingOffsets", "latest").option("subscribe", str).load();
    }

    public StreamingQuery org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter(Dataset<Row> dataset, Option<String> option, Option<OutputMode> option2, Map<String, String> map, Seq<String> seq) {
        ObjectRef create = ObjectRef.create((Object) null);
        File createTempDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
        Dataset df = dataset.toDF();
        if (seq.length() > 0) {
            df = df.selectExpr(seq);
        }
        create.elem = df.writeStream().format("kafka").option("checkpointLocation", createTempDir.getCanonicalPath()).option("kafka.bootstrap.servers", testUtils().brokerAddress()).option("kafka.max.block.ms", "1000").trigger(Trigger.Continuous(1000L)).queryName("kafkaStream");
        option.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$1(this, create));
        option2.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$2(this, create));
        map.foreach(new KafkaContinuousSinkSuite$$anonfun$org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$3(this, create));
        return ((DataStreamWriter) create.elem).start();
    }

    public Option<String> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$2() {
        return None$.MODULE$;
    }

    public Option<OutputMode> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$3() {
        return None$.MODULE$;
    }

    public Map<String, String> org$apache$spark$sql$kafka010$KafkaContinuousSinkSuite$$createKafkaWriter$default$4() {
        return Predef$.MODULE$.Map().apply(Nil$.MODULE$);
    }

    public KafkaContinuousSinkSuite() {
        KafkaContinuousTest.Cclass.$init$(this);
        this.streamingTimeout = SpanSugar$.MODULE$.convertIntToGrainOfTime(30).seconds();
        this.brokerProps = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.create.topics.enable"), "false")}));
        test("streaming - write to kafka with topic field", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$1(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 53));
        test("streaming - write w/o topic field, with topic option", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$3(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 94));
        test("streaming - topic field and topic option", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$5(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 134));
        test("null topic attribute", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$7(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 182));
        test("streaming - write data with bad schema", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$8(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 216));
        test("streaming - write data with valid schema but wrong types", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$9(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 266));
        test("streaming - write to non-existing topic", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$10(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 331));
        test("streaming - exception on config serializer", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$11(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 361));
        test("generic - write big data with small producer buffer", Predef$.MODULE$.wrapRefArray(new Tag[0]), new KafkaContinuousSinkSuite$$anonfun$12(this), new Position("KafkaContinuousSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 403));
    }
}
