package org.apache.spark.sql.execution.streaming;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.streaming.WriteToStream;
import org.apache.spark.sql.catalyst.trees.TreePattern$;
import org.apache.spark.sql.catalyst.trees.TreePatternBits;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource$;
import org.apache.spark.sql.streaming.StreamingQueryStatus;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.GenericTraversableTemplate;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;

/* compiled from: MicroBatchExecution.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]h\u0001B\u0012%\u0001EB\u0011B\u000e\u0001\u0003\u0002\u0003\u0006IaN\u001e\t\u0013q\u0002!\u0011!Q\u0001\nu\u0012\u0005\"C\"\u0001\u0005\u0003\u0005\u000b\u0011\u0002#K\u0011!Y\u0005A!A!\u0002\u0013a\u0005\u0002\u0003/\u0001\u0005\u0003\u0005\u000b\u0011B/\t\u000b\u0011\u0004A\u0011A3\t\u000f1\u0004\u0001\u0019!C\t[\"I\u0011\u0011\u0001\u0001A\u0002\u0013E\u00111\u0001\u0005\b\u0003#\u0001\u0001\u0015)\u0003o\u0011%\tY\u0002\u0001b\u0001\n\u0013\ti\u0002\u0003\u0005\u00028\u0001\u0001\u000b\u0011BA\u0010\u0011-\tI\u0004\u0001a\u0001\u0002\u0004%I!a\u000f\t\u0017\u0005\r\u0003\u00011AA\u0002\u0013%\u0011Q\t\u0005\f\u0003\u0013\u0002\u0001\u0019!A!B\u0013\ti\u0004\u0003\u0006\u0002L\u0001A)\u0019!C!\u0003\u001bB\u0011\"a\u0018\u0001\u0001\u0004%I!!\u0019\t\u0013\u0005%\u0004\u00011A\u0005\n\u0005-\u0004\u0002CA8\u0001\u0001\u0006K!a\u0019\t\u000f\u0005E\u0004\u0001\"\u0011\u0002t!9\u0011Q\u000f\u0001\u0005R\u0005M\u0004bBA<\u0001\u0011E\u0011\u0011\u0010\u0005\b\u0003\u007f\u0002A\u0011BAA\u0011\u001d\t9\t\u0001C\u0005\u0003CBq!!#\u0001\t\u0013\tY\tC\u0004\u0002\u0018\u0002!I!!'\t\u000f\u0005}\u0005\u0001\"\u0003\u0002\"\"A\u0011q\u0015\u0001\u0005\u0002!\nI\u000bC\u0007\u0002N\u0002\u0001\n1!A\u0001\n\u0013\tym\u000f\u0005\u000e\u0003#\u0004\u0001\u0013aA\u0001\u0002\u0013%\u00111\u001b&\b\u000f\u0005UG\u0005#\u0001\u0002X\u001a11\u0005\nE\u0001\u00033Da\u0001Z\u0010\u0005\u0002\u0005\u0005\b\"CAr?\t\u0007I\u0011AAs\u0011!\t)p\bQ\u0001\n\u0005\u001d(aE'jGJ|')\u0019;dQ\u0016CXmY;uS>t'BA\u0013'\u0003%\u0019HO]3b[&twM\u0003\u0002(Q\u0005IQ\r_3dkRLwN\u001c\u0006\u0003S)\n1a]9m\u0015\tYC&A\u0003ta\u0006\u00148N\u0003\u0002.]\u00051\u0011\r]1dQ\u0016T\u0011aL\u0001\u0004_J<7\u0001A\n\u0003\u0001I\u0002\"a\r\u001b\u000e\u0003\u0011J!!\u000e\u0013\u0003\u001fM#(/Z1n\u000bb,7-\u001e;j_:\fAb\u001d9be.\u001cVm]:j_:\u0004\"\u0001O\u001d\u000e\u0003!J!A\u000f\u0015\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\n\u0005Y\"\u0014a\u0002;sS\u001e<WM\u001d\t\u0003}\u0001k\u0011a\u0010\u0006\u0003K!J!!Q \u0003\u000fQ\u0013\u0018nZ4fe&\u0011A\bN\u0001\riJLwmZ3s\u00072|7m\u001b\t\u0003\u000b\"k\u0011A\u0012\u0006\u0003\u000f*\nA!\u001e;jY&\u0011\u0011J\u0012\u0002\u0006\u00072|7m[\u0005\u0003\u0007R\nA\"\u001a=ue\u0006|\u0005\u000f^5p]N\u0004B!\u0014,Z3:\u0011a\n\u0016\t\u0003\u001fJk\u0011\u0001\u0015\u0006\u0003#B\na\u0001\u0010:p_Rt$\"A*\u0002\u000bM\u001c\u0017\r\\1\n\u0005U\u0013\u0016A\u0002)sK\u0012,g-\u0003\u0002X1\n\u0019Q*\u00199\u000b\u0005U\u0013\u0006CA'[\u0013\tY\u0006L\u0001\u0004TiJLgnZ\u0001\u0005a2\fg\u000e\u0005\u0002_E6\tqL\u0003\u0002&A*\u0011\u0011\rK\u0001\tG\u0006$\u0018\r\\=ti&\u00111m\u0018\u0002\u000e/JLG/\u001a+p'R\u0014X-Y7\u0002\rqJg.\u001b;?)\u00191w\r[5kWB\u00111\u0007\u0001\u0005\u0006m\u0019\u0001\ra\u000e\u0005\u0006y\u0019\u0001\r!\u0010\u0005\u0006\u0007\u001a\u0001\r\u0001\u0012\u0005\u0006\u0017\u001a\u0001\r\u0001\u0014\u0005\u00069\u001a\u0001\r!X\u0001\bg>,(oY3t+\u0005q\u0007cA8uo:\u0011\u0001O\u001d\b\u0003\u001fFL\u0011aU\u0005\u0003gJ\u000bq\u0001]1dW\u0006<W-\u0003\u0002vm\n\u00191+Z9\u000b\u0005M\u0014\u0006C\u0001=\u007f\u001b\u0005I(BA\u0013{\u0015\tYH0\u0001\u0003sK\u0006$'BA?)\u0003%\u0019wN\u001c8fGR|'/\u0003\u0002��s\ny1\u000b]1sW\u0012\u000bG/Y*ue\u0016\fW.A\u0006t_V\u00148-Z:`I\u0015\fH\u0003BA\u0003\u0003\u001b\u0001B!a\u0002\u0002\n5\t!+C\u0002\u0002\fI\u0013A!\u00168ji\"A\u0011q\u0002\u0005\u0002\u0002\u0003\u0007a.A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u0013\u0005U\u0001\u0003BA\u0004\u0003/I1!!\u0007S\u0005!1x\u000e\\1uS2,\u0017a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005}!\u0003CA\u0011\u0003K\tY#!\r\u0007\r\u0005\r\u0002\u0001AA\u0010\u00051a$/\u001a4j]\u0016lWM\u001c;?!\u0011\t9!a\n\n\u0007\u0005%\"KA\u0004Qe>$Wo\u0019;\u0011\t\u0005\u001d\u0011QF\u0005\u0004\u0003_\u0011&\u0001D*fe&\fG.\u001b>bE2,\u0007cA\u001a\u00024%\u0019\u0011Q\u0007\u0013\u0003\u001fQ\u0013\u0018nZ4fe\u0016CXmY;u_J\f\u0001\u0003\u001e:jO\u001e,'/\u0012=fGV$xN\u001d\u0011\u0002!]\fG/\u001a:nCJ\\GK]1dW\u0016\u0014XCAA\u001f!\r\u0019\u0014qH\u0005\u0004\u0003\u0003\"#\u0001E,bi\u0016\u0014X.\u0019:l)J\f7m[3s\u0003Q9\u0018\r^3s[\u0006\u00148\u000e\u0016:bG.,'o\u0018\u0013fcR!\u0011QAA$\u0011%\ty!DA\u0001\u0002\u0004\ti$A\txCR,'/\\1sWR\u0013\u0018mY6fe\u0002\n1\u0002\\8hS\u000e\fG\u000e\u00157b]V\u0011\u0011q\n\t\u0005\u0003#\nY&\u0004\u0002\u0002T)!\u0011QKA,\u0003\u001dawnZ5dC2T1!!\u0017a\u0003\u0015\u0001H.\u00198t\u0013\u0011\ti&a\u0015\u0003\u00171{w-[2bYBc\u0017M\\\u0001\u001aSN\u001cUO\u001d:f]R\u0014\u0015\r^2i\u0007>t7\u000f\u001e:vGR,G-\u0006\u0002\u0002dA!\u0011qAA3\u0013\r\t9G\u0015\u0002\b\u0005>|G.Z1o\u0003uI7oQ;se\u0016tGOQ1uG\"\u001cuN\\:ueV\u001cG/\u001a3`I\u0015\fH\u0003BA\u0003\u0003[B\u0011\"a\u0004\u0012\u0003\u0003\u0005\r!a\u0019\u00025%\u001c8)\u001e:sK:$()\u0019;dQ\u000e{gn\u001d;sk\u000e$X\r\u001a\u0011\u0002\tM$x\u000e\u001d\u000b\u0003\u0003\u000b\tAb\u001d;beR$&/[4hKJ\f!C];o\u0003\u000e$\u0018N^1uK\u0012\u001cFO]3b[R!\u0011QAA>\u0011\u0019\ti(\u0006a\u0001o\u0005)2\u000f]1sWN+7o]5p]\u001a{'o\u0015;sK\u0006l\u0017\u0001\u00069paVd\u0017\r^3Ti\u0006\u0014Ho\u00144gg\u0016$8\u000f\u0006\u0003\u0002\u0006\u0005\r\u0005BBAC-\u0001\u0007q'\u0001\rta\u0006\u00148nU3tg&|g\u000eV8Sk:\u0014\u0015\r^2iKN\f!#[:OK^$\u0015\r^1Bm\u0006LG.\u00192mK\u0006qq-\u001a;Ti\u0006\u0014Ho\u00144gg\u0016$H\u0003BAG\u0003'\u00032\u0001_AH\u0013\r\t\t*\u001f\u0002\u0007\u001f\u001a47/\u001a;\t\r\u0005U\u0005\u00041\u0001x\u0003)!\u0017\r^1TiJ,\u0017-\\\u0001\u0013G>t7\u000f\u001e:vGRtU\r\u001f;CCR\u001c\u0007\u000e\u0006\u0003\u0002d\u0005m\u0005bBAO3\u0001\u0007\u00111M\u0001\u0015]>$\u0015\r^1CCR\u001c\u0007.Z:F]\u0006\u0014G.\u001a3\u0002\u0011I,hNQ1uG\"$B!!\u0002\u0002$\"1\u0011Q\u0015\u000eA\u0002]\nac\u001d9be.\u001cVm]:j_:$vNU;o\u0005\u0006$8\r[\u0001\u0013o&$\b\u000e\u0015:pOJ,7o\u001d'pG.,G-\u0006\u0003\u0002,\u0006EF\u0003BAW\u0003\u0007\u0004B!a,\u000222\u0001AaBAZ7\t\u0007\u0011Q\u0017\u0002\u0002)F!\u0011qWA_!\u0011\t9!!/\n\u0007\u0005m&KA\u0004O_RD\u0017N\\4\u0011\t\u0005\u001d\u0011qX\u0005\u0004\u0003\u0003\u0014&aA!os\"A\u0011QY\u000e\u0005\u0002\u0004\t9-A\u0001g!\u0019\t9!!3\u0002.&\u0019\u00111\u001a*\u0003\u0011q\u0012\u0017P\\1nKz\n!c];qKJ$3\u000f]1sWN+7o]5p]V\tq'\u0001\ntkB,'\u000f\n;sS\u001e<WM]\"m_\u000e\\W#\u0001#\u0002'5K7M]8CCR\u001c\u0007.\u0012=fGV$\u0018n\u001c8\u0011\u0005Mz2cA\u0010\u0002\\B!\u0011qAAo\u0013\r\tyN\u0015\u0002\u0007\u0003:L(+\u001a4\u0015\u0005\u0005]\u0017\u0001\u0004\"B)\u000eCu,\u0013#`\u0017\u0016KVCAAt!\u0011\tI/a=\u000e\u0005\u0005-(\u0002BAw\u0003_\fA\u0001\\1oO*\u0011\u0011\u0011_\u0001\u0005U\u00064\u0018-C\u0002\\\u0003W\fQBQ!U\u0007\"{\u0016\nR0L\u000bf\u0003\u0003")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/MicroBatchExecution.class */
public class MicroBatchExecution extends StreamExecution {
    private LogicalPlan logicalPlan;
    private Map<String, String> extraOptions;
    private final WriteToStream plan;
    private volatile Seq<SparkDataStream> sources;
    private final Product triggerExecutor;
    private WatermarkTracker watermarkTracker;
    private boolean isCurrentBatchConstructed;
    private volatile boolean bitmap$0;

    public static String BATCH_ID_KEY() {
        return MicroBatchExecution$.MODULE$.BATCH_ID_KEY();
    }

    public /* synthetic */ SparkSession org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession() {
        return super.sparkSession();
    }

    private /* synthetic */ Clock super$triggerClock() {
        return super.triggerClock();
    }

    @Override // org.apache.spark.sql.execution.streaming.ProgressReporter
    public Seq<SparkDataStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<SparkDataStream> seq) {
        this.sources = seq;
    }

    private Product triggerExecutor() {
        return this.triggerExecutor;
    }

    private WatermarkTracker watermarkTracker() {
        return this.watermarkTracker;
    }

    private void watermarkTracker_$eq(WatermarkTracker watermarkTracker) {
        this.watermarkTracker = watermarkTracker;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.spark.sql.execution.streaming.MicroBatchExecution] */
    private LogicalPlan logicalPlan$lzycompute() {
        LogicalPlan logicalPlan;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                Predef$.MODULE$.assert(queryExecutionThread() == Thread.currentThread(), () -> {
                    return new StringBuilder(83).append("logicalPlan must be initialized in QueryExecutionThread ").append("but the current thread was ").append(Thread.currentThread()).toString();
                });
                LongRef create = LongRef.create(0L);
                LogicalPlan transform = analyzedPlan().transform(new MicroBatchExecution$$anonfun$1(this, Map$.MODULE$.apply(Nil$.MODULE$), create, Utils$.MODULE$.stringToSeq(super.sparkSession().sqlContext().conf().disabledV2StreamingMicroBatchReaders()), Map$.MODULE$.apply(Nil$.MODULE$), Map$.MODULE$.apply(Nil$.MODULE$)));
                sources_$eq(transform.collect(new MicroBatchExecution$$anonfun$logicalPlan$lzycompute$1(null)));
                Product triggerExecutor = triggerExecutor();
                uniqueSources_$eq(triggerExecutor instanceof SingleBatchExecutor ? ((TraversableOnce) ((TraversableLike) sources().distinct()).map(sparkDataStream -> {
                    Tuple2 $minus$greater$extension;
                    if (sparkDataStream instanceof SupportsAdmissionControl) {
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl) sparkDataStream;
                        ReadLimit defaultReadLimit = supportsAdmissionControl.getDefaultReadLimit();
                        ReadLimit allAvailable = ReadLimit.allAvailable();
                        if (defaultReadLimit != null ? !defaultReadLimit.equals(allAvailable) : allAvailable != null) {
                            this.logWarning(() -> {
                                return new StringBuilder(58).append("The read limit ").append(defaultReadLimit).append(" for ").append(supportsAdmissionControl).append(" is ignored when Trigger.Once is used.").toString();
                            });
                        }
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsAdmissionControl), ReadLimit.allAvailable());
                    } else {
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sparkDataStream), ReadLimit.allAvailable());
                    }
                    return $minus$greater$extension;
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()) : triggerExecutor instanceof MultiBatchExecutor ? ((TraversableOnce) ((TraversableLike) ((TraversableLike) sources().distinct()).map(sparkDataStream2 -> {
                    SupportsTriggerAvailableNow availableNowMicroBatchStreamWrapper;
                    if (sparkDataStream2 instanceof SupportsTriggerAvailableNow) {
                        availableNowMicroBatchStreamWrapper = (SupportsTriggerAvailableNow) sparkDataStream2;
                    } else if (sparkDataStream2 instanceof Source) {
                        availableNowMicroBatchStreamWrapper = new AvailableNowSourceWrapper((Source) sparkDataStream2);
                    } else {
                        if (!(sparkDataStream2 instanceof MicroBatchStream)) {
                            throw new MatchError(sparkDataStream2);
                        }
                        availableNowMicroBatchStreamWrapper = new AvailableNowMicroBatchStreamWrapper((MicroBatchStream) sparkDataStream2);
                    }
                    return availableNowMicroBatchStreamWrapper;
                }, Seq$.MODULE$.canBuildFrom())).map(supportsTriggerAvailableNow -> {
                    supportsTriggerAvailableNow.prepareForTriggerAvailableNow();
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsTriggerAvailableNow), supportsTriggerAvailableNow.getDefaultReadLimit());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()) : ((TraversableOnce) ((TraversableLike) sources().distinct()).map(sparkDataStream3 -> {
                    Tuple2 $minus$greater$extension;
                    if (sparkDataStream3 instanceof SupportsAdmissionControl) {
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl) sparkDataStream3;
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsAdmissionControl), supportsAdmissionControl.getDefaultReadLimit());
                    } else {
                        $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sparkDataStream3), ReadLimit.allAvailable());
                    }
                    return $minus$greater$extension;
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
                SupportsWrite sink = sink();
                if (sink instanceof SupportsWrite) {
                    SupportsWrite supportsWrite = sink;
                    logicalPlan = new WriteToMicroBatchDataSource(this.plan.catalogAndIdent().map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        return DataSourceV2Relation$.MODULE$.create(supportsWrite, new Some((TableCatalog) tuple2._1()), new Some((Identifier) tuple2._2()));
                    }), supportsWrite, transform, id().toString(), this.extraOptions, outputMode(), WriteToMicroBatchDataSource$.MODULE$.apply$default$7());
                } else {
                    logicalPlan = transform;
                }
                this.logicalPlan = logicalPlan;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        this.extraOptions = null;
        return this.logicalPlan;
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution, org.apache.spark.sql.execution.streaming.ProgressReporter
    public LogicalPlan logicalPlan() {
        return !this.bitmap$0 ? logicalPlan$lzycompute() : this.logicalPlan;
    }

    private boolean isCurrentBatchConstructed() {
        return this.isCurrentBatchConstructed;
    }

    private void isCurrentBatchConstructed_$eq(boolean z) {
        this.isCurrentBatchConstructed = z;
    }

    @Override // org.apache.spark.sql.streaming.StreamingQuery
    public void stop() {
        state().set(TERMINATED$.MODULE$);
        if (queryExecutionThread().isAlive()) {
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
            interruptAndAwaitExecutionThreadTermination();
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
        }
        logInfo(() -> {
            return new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString();
        });
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution, org.apache.spark.sql.execution.streaming.ProgressReporter
    public void startTrigger() {
        startTrigger();
        StreamingQueryStatus currentStatus = currentStatus();
        currentStatus_$eq(currentStatus.copy(currentStatus.copy$default$1(), currentStatus.copy$default$2(), true));
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution
    public void runActivatedStream(SparkSession sparkSession) {
        boolean streamingNoDataMicroBatchesEnabled = sparkSession.sessionState().conf().streamingNoDataMicroBatchesEnabled();
        triggerExecutor().execute(() -> {
            if (this.isActive()) {
                BooleanRef create = BooleanRef.create(false);
                this.startTrigger();
                this.reportTimeTaken("triggerExecution", () -> {
                    if (this.currentBatchId() < 0) {
                        AcceptsLatestSeenOffsetHandler$.MODULE$.setLatestSeenOffsetOnSources(this.offsetLog().getLatest().map(tuple2 -> {
                            return (OffsetSeq) tuple2._2();
                        }), this.sources());
                        this.populateStartOffsets(sparkSession);
                        this.logInfo(() -> {
                            return new StringBuilder(20).append("Stream started from ").append(this.committedOffsets()).toString();
                        });
                    }
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDescription(this.getBatchDescriptionString());
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDoAsUser();
                    if (!this.isCurrentBatchConstructed()) {
                        this.isCurrentBatchConstructed_$eq(this.constructNextBatch(streamingNoDataMicroBatchesEnabled));
                    }
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets(), this.latestOffsets());
                    create.elem = this.isNewDataAvailable();
                    StreamingQueryStatus currentStatus = this.currentStatus();
                    this.currentStatus_$eq(currentStatus.copy(currentStatus.copy$default$1(), this.isNewDataAvailable(), currentStatus.copy$default$3()));
                    if (!this.isCurrentBatchConstructed()) {
                        this.updateStatusMessage("Waiting for data to arrive");
                        return;
                    }
                    if (create.elem) {
                        this.updateStatusMessage("Processing new data");
                    } else {
                        this.updateStatusMessage("No new data but cleaning up state");
                    }
                    this.runBatch(sparkSession);
                });
                this.finishTrigger(create.elem, this.isCurrentBatchConstructed());
                this.withProgressLocked(() -> {
                    this.awaitProgressLockCondition().signalAll();
                });
                if (this.isCurrentBatchConstructed()) {
                    this.currentBatchId_$eq(this.currentBatchId() + 1);
                    this.isCurrentBatchConstructed_$eq(false);
                } else if (this.triggerExecutor() instanceof MultiBatchExecutor) {
                    this.logInfo(() -> {
                        return "Finished processing all available data for the trigger, terminating this Trigger.AvailableNow query";
                    });
                    this.state().set(TERMINATED$.MODULE$);
                } else {
                    Thread.sleep(this.pollingDelayMs());
                }
            }
            this.updateStatusMessage("Waiting for next trigger");
            return this.isActive();
        });
    }

    private void populateStartOffsets(SparkSession sparkSession) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        BoxedUnit boxedUnit;
        sinkCommitProgress_$eq(None$.MODULE$);
        Some latest = offsetLog().getLatest();
        if (!(latest instanceof Some) || (tuple2 = (Tuple2) latest.value()) == null) {
            if (!None$.MODULE$.equals(latest)) {
                throw new MatchError(latest);
            }
            logInfo(() -> {
                return "Starting new streaming query.";
            });
            currentBatchId_$eq(0L);
            watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        OffsetSeq offsetSeq = (OffsetSeq) tuple2._2();
        currentBatchId_$eq(_1$mcJ$sp);
        isCurrentBatchConstructed_$eq(true);
        availableOffsets_$eq(offsetSeq.toStreamProgress(sources()));
        if (_1$mcJ$sp != 0) {
            committedOffsets_$eq(((OffsetSeq) offsetLog().get(_1$mcJ$sp - 1).getOrElse(() -> {
                this.logError(() -> {
                    return new StringBuilder(366).append("The offset log for batch ").append(_1$mcJ$sp - 1).append(" doesn't exist, ").append("which is required to restart the query from the latest batch ").append(_1$mcJ$sp).append(" ").append("from the offset log. Please ensure there are two subsequent offset logs ").append("available for the latest batch via manually deleting the offset file(s). ").append("Please also ensure the latest batch for commit log is equal or one batch ").append("earlier than the latest batch for offset log.").toString();
                });
                throw new IllegalStateException(new StringBuilder(20).append("batch ").append(_1$mcJ$sp - 1).append(" doesn't exist").toString());
            })).toStreamProgress(sources()));
        }
        offsetSeq.metadata().foreach(offsetSeqMetadata -> {
            $anonfun$populateStartOffsets$3(this, sparkSession, offsetSeqMetadata);
            return BoxedUnit.UNIT;
        });
        Some latest2 = commitLog().getLatest();
        if ((latest2 instanceof Some) && (tuple22 = (Tuple2) latest2.value()) != null) {
            long _1$mcJ$sp2 = tuple22._1$mcJ$sp();
            CommitMetadata commitMetadata = (CommitMetadata) tuple22._2();
            if (_1$mcJ$sp == _1$mcJ$sp2) {
                availableOffsets().foreach(tuple23 -> {
                    Dataset<Row> dataset;
                    if (tuple23 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple23._1();
                        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple23._2();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            if (offset instanceof Offset) {
                                dataset = source.getBatch(this.committedOffsets().get((SparkDataStream) source).map(offset2 -> {
                                    return (Offset) offset2;
                                }), (Offset) offset);
                                return dataset;
                            }
                        }
                    }
                    dataset = BoxedUnit.UNIT;
                    return dataset;
                });
                currentBatchId_$eq(_1$mcJ$sp2 + 1);
                isCurrentBatchConstructed_$eq(false);
                committedOffsets_$eq(committedOffsets().m2307$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) availableOffsets()));
                watermarkTracker().setWatermark(package$.MODULE$.max(watermarkTracker().currentWatermark(), commitMetadata.nextBatchWatermarkMs()));
                boxedUnit = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 == _1$mcJ$sp - 1) {
                availableOffsets().foreach(tuple24 -> {
                    Dataset<Row> dataset;
                    if (tuple24 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple24._1();
                        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple24._2();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            if (offset instanceof Offset) {
                                Offset offset2 = (Offset) offset;
                                Option<Offset> map = this.committedOffsets().get((SparkDataStream) source).map(offset3 -> {
                                    return (Offset) offset3;
                                });
                                dataset = BoxesRunTime.unboxToBoolean(map.map(offset4 -> {
                                    return BoxesRunTime.boxToBoolean($anonfun$populateStartOffsets$8(offset2, offset4));
                                }).getOrElse(() -> {
                                    return true;
                                })) ? source.getBatch(map, offset2) : BoxedUnit.UNIT;
                                return dataset;
                            }
                        }
                    }
                    dataset = BoxedUnit.UNIT;
                    return dataset;
                });
                boxedUnit = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 < _1$mcJ$sp - 1) {
                logWarning(() -> {
                    return new StringBuilder(79).append("Batch completion log latest batch id is ").append(_1$mcJ$sp2).append(", which is not trailing ").append("batchid ").append(_1$mcJ$sp).append(" by one").toString();
                });
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            if (!None$.MODULE$.equals(latest2)) {
                throw new MatchError(latest2);
            }
            logInfo(() -> {
                return "no commit log present";
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        logInfo(() -> {
            return new StringBuilder(65).append("Resuming at batch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).append(" and available offsets ").append(this.availableOffsets()).toString();
        });
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    private boolean isNewDataAvailable() {
        return availableOffsets().exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$1(this, tuple2));
        });
    }

    private org.apache.spark.sql.connector.read.streaming.Offset getStartOffset(SparkDataStream sparkDataStream) {
        org.apache.spark.sql.connector.read.streaming.Offset offset;
        Option<org.apache.spark.sql.connector.read.streaming.Offset> option = availableOffsets().get(sparkDataStream);
        if (sparkDataStream instanceof Source) {
            offset = (org.apache.spark.sql.connector.read.streaming.Offset) option.orNull(Predef$.MODULE$.$conforms());
        } else {
            if (!(sparkDataStream instanceof MicroBatchStream)) {
                throw new MatchError(sparkDataStream);
            }
            MicroBatchStream microBatchStream = (MicroBatchStream) sparkDataStream;
            offset = (org.apache.spark.sql.connector.read.streaming.Offset) option.map(offset2 -> {
                return microBatchStream.deserializeOffset(offset2.json());
            }).getOrElse(() -> {
                return microBatchStream.initialOffset();
            });
        }
        return offset;
    }

    private boolean constructNextBatch(boolean z) {
        Object obj = new Object();
        try {
            return BoxesRunTime.unboxToBoolean(withProgressLocked(() -> {
                if (this.isCurrentBatchConstructed()) {
                    throw new NonLocalReturnControl.mcZ.sp(obj, true);
                }
                Tuple2 unzip = ((GenericTraversableTemplate) this.uniqueSources().toSeq().map(tuple2 -> {
                    Tuple2 tuple2;
                    if (tuple2 != null) {
                        AvailableNowDataStreamWrapper availableNowDataStreamWrapper = (SparkDataStream) tuple2._1();
                        ReadLimit readLimit = (ReadLimit) tuple2._2();
                        if (availableNowDataStreamWrapper instanceof AvailableNowDataStreamWrapper) {
                            AvailableNowDataStreamWrapper availableNowDataStreamWrapper2 = availableNowDataStreamWrapper;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(availableNowDataStreamWrapper2).toString());
                            SparkDataStream delegate = availableNowDataStreamWrapper2.delegate();
                            tuple2 = (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                return new Tuple2(new Tuple2(delegate, Option$.MODULE$.apply(availableNowDataStreamWrapper2.latestOffset(this.getStartOffset(delegate), readLimit))), new Tuple2(delegate, Option$.MODULE$.apply(availableNowDataStreamWrapper2.reportLatestOffset())));
                            });
                            return tuple2;
                        }
                    }
                    if (tuple2 != null) {
                        SupportsAdmissionControl supportsAdmissionControl = (SparkDataStream) tuple2._1();
                        ReadLimit readLimit2 = (ReadLimit) tuple2._2();
                        if (supportsAdmissionControl instanceof SupportsAdmissionControl) {
                            SupportsAdmissionControl supportsAdmissionControl2 = supportsAdmissionControl;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(supportsAdmissionControl2).toString());
                            tuple2 = (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                return new Tuple2(new Tuple2(supportsAdmissionControl2, Option$.MODULE$.apply(supportsAdmissionControl2.latestOffset(this.getStartOffset(supportsAdmissionControl2), readLimit2))), new Tuple2(supportsAdmissionControl2, Option$.MODULE$.apply(supportsAdmissionControl2.reportLatestOffset())));
                            });
                            return tuple2;
                        }
                    }
                    if (tuple2 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(source).toString());
                            tuple2 = (Tuple2) this.reportTimeTaken("getOffset", () -> {
                                Option<Offset> offset = source.getOffset();
                                return new Tuple2(new Tuple2(source, offset), new Tuple2(source, offset));
                            });
                            return tuple2;
                        }
                    }
                    if (tuple2 != null) {
                        MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
                        if (microBatchStream instanceof MicroBatchStream) {
                            MicroBatchStream microBatchStream2 = microBatchStream;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(microBatchStream2).toString());
                            tuple2 = (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                org.apache.spark.sql.connector.read.streaming.Offset latestOffset = microBatchStream2.latestOffset();
                                return new Tuple2(new Tuple2(microBatchStream2, Option$.MODULE$.apply(latestOffset)), new Tuple2(microBatchStream2, Option$.MODULE$.apply(latestOffset)));
                            });
                            return tuple2;
                        }
                    }
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    throw new IllegalStateException(new StringBuilder(19).append("Unexpected source: ").append((SparkDataStream) tuple2._1()).toString());
                }, Seq$.MODULE$.canBuildFrom())).unzip(Predef$.MODULE$.$conforms());
                if (unzip == null) {
                    throw new MatchError(unzip);
                }
                Tuple2 tuple22 = new Tuple2((Seq) unzip._1(), (Seq) unzip._2());
                Seq seq = (Seq) tuple22._1();
                Seq seq2 = (Seq) tuple22._2();
                this.availableOffsets_$eq(this.availableOffsets().m2307$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) ((TraversableOnce) ((TraversableLike) seq.filter(tuple23 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$7(tuple23));
                })).map(tuple24 -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple24._1()), ((Option) tuple24._2()).get());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
                this.latestOffsets_$eq(this.latestOffsets().m2307$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) ((TraversableOnce) ((TraversableLike) seq2.filter(tuple25 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$9(tuple25));
                })).map(tuple26 -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple26._1()), ((Option) tuple26._2()).get());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
                OffsetSeqMetadata offsetSeqMetadata = this.offsetSeqMetadata();
                this.offsetSeqMetadata_$eq(offsetSeqMetadata.copy(this.watermarkTracker().currentWatermark(), this.super$triggerClock().getTimeMillis(), offsetSeqMetadata.copy$default$3()));
                boolean z2 = z && Option$.MODULE$.apply(this.lastExecution()).exists(incrementalExecution -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$11(this, incrementalExecution));
                });
                boolean z3 = this.isNewDataAvailable() || z2;
                this.logTrace(() -> {
                    return new StringBuilder(113).append("noDataBatchesEnabled = ").append(z).append(", ").append("lastExecutionRequiresAnotherBatch = ").append(z2).append(", ").append("isNewDataAvailable = ").append(this.isNewDataAvailable()).append(", ").append("shouldConstructNextBatch = ").append(z3).toString();
                });
                if (z3) {
                    this.updateStatusMessage("Writing offsets to log");
                    this.reportTimeTaken("walCommit", () -> {
                        Predef$.MODULE$.assert(this.offsetLog().add(this.currentBatchId(), this.availableOffsets().toOffsetSeq(this.sources(), this.offsetSeqMetadata())), () -> {
                            return new StringBuilder(67).append("Concurrent update to the log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString();
                        });
                        this.logInfo(() -> {
                            return new StringBuilder(39).append("Committed offsets for batch ").append(this.currentBatchId()).append(". ").append("Metadata ").append(this.offsetSeqMetadata().toString()).toString();
                        });
                        if (this.currentBatchId() != 0) {
                            Option<OffsetSeq> option = this.offsetLog().get(this.currentBatchId() - 1);
                            if (!option.isDefined()) {
                                throw new IllegalStateException(new StringBuilder(20).append("batch ").append(this.currentBatchId() - 1).append(" doesn't exist").toString());
                            }
                            ((OffsetSeq) option.get()).toStreamProgress(this.sources()).foreach(tuple27 -> {
                                $anonfun$constructNextBatch$16(tuple27);
                                return BoxedUnit.UNIT;
                            });
                        }
                        if (this.minLogEntriesToMaintain() < this.currentBatchId()) {
                            this.purge(this.currentBatchId() - this.minLogEntriesToMaintain());
                        }
                    });
                    this.noNewData_$eq(false);
                } else {
                    this.noNewData_$eq(true);
                    this.awaitProgressLockCondition().signalAll();
                }
                return z3;
            }));
        } catch (NonLocalReturnControl e) {
            if (e.key() == obj) {
                return e.value$mcZ$sp();
            }
            throw e;
        }
    }

    private void runBatch(SparkSession sparkSession) {
        LogicalPlan withNewBatchId;
        logDebug(() -> {
            return new StringBuilder(14).append("Running batch ").append(this.currentBatchId()).toString();
        });
        newData_$eq((Map) reportTimeTaken("getBatch", () -> {
            return (Map) this.availableOffsets().flatMap(tuple2 -> {
                Iterable option2Iterable;
                org.apache.spark.sql.connector.read.streaming.Offset offset;
                if (tuple2 != null) {
                    SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
                    org.apache.spark.sql.connector.read.streaming.Offset offset2 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
                    if (sparkDataStream instanceof Source) {
                        Source source = (Source) sparkDataStream;
                        if (offset2 instanceof Offset) {
                            Offset offset3 = (Offset) offset2;
                            if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((SparkDataStream) source).map(offset4 -> {
                                return BoxesRunTime.boxToBoolean($anonfun$runBatch$4(offset3, offset4));
                            }).getOrElse(() -> {
                                return true;
                            }))) {
                                Option<Offset> map = this.committedOffsets().get((SparkDataStream) source).map(offset5 -> {
                                    return (Offset) offset5;
                                });
                                Dataset<Row> batch = source.getBatch(map, offset3);
                                Predef$.MODULE$.assert(batch.isStreaming(), () -> {
                                    return new StringBuilder(67).append("DataFrame returned by getBatch from ").append(source).append(" did not have isStreaming=true\n").append(batch.queryExecution().logical()).toString();
                                });
                                this.logDebug(() -> {
                                    return new StringBuilder(27).append("Retrieving data from ").append(source).append(": ").append(map).append(" -> ").append(offset3).toString();
                                });
                                option2Iterable = Option$.MODULE$.option2Iterable(new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(source), batch.logicalPlan())));
                                return option2Iterable;
                            }
                        }
                    }
                }
                if (tuple2 != null) {
                    MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
                    org.apache.spark.sql.connector.read.streaming.Offset offset6 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
                    if (microBatchStream instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream2 = microBatchStream;
                        if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((SparkDataStream) microBatchStream2).map(offset7 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$runBatch$9(offset6, offset7));
                        }).getOrElse(() -> {
                            return true;
                        }))) {
                            Option map2 = this.committedOffsets().get((SparkDataStream) microBatchStream2).map(offset8 -> {
                                return microBatchStream2.deserializeOffset(offset8.json());
                            });
                            if (offset6 instanceof SerializedOffset) {
                                offset = microBatchStream2.deserializeOffset(((SerializedOffset) offset6).json());
                            } else {
                                if (offset6 == null) {
                                    throw new MatchError(offset6);
                                }
                                offset = offset6;
                            }
                            org.apache.spark.sql.connector.read.streaming.Offset offset9 = offset;
                            org.apache.spark.sql.connector.read.streaming.Offset offset10 = (org.apache.spark.sql.connector.read.streaming.Offset) map2.getOrElse(() -> {
                                return microBatchStream2.initialOffset();
                            });
                            this.logDebug(() -> {
                                return new StringBuilder(27).append("Retrieving data from ").append(microBatchStream2).append(": ").append(map2).append(" -> ").append(offset9).toString();
                            });
                            option2Iterable = Option$.MODULE$.option2Iterable(new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(microBatchStream2), new OffsetHolder(offset10, offset9))));
                            return option2Iterable;
                        }
                    }
                }
                option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
                return option2Iterable;
            }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
        }));
        LogicalPlan transform = logicalPlan().transform(new MicroBatchExecution$$anonfun$2(this));
        LogicalPlan transformAllExpressionsWithPruning = transform.transformAllExpressionsWithPruning(treePatternBits -> {
            return BoxesRunTime.boxToBoolean($anonfun$runBatch$14(treePatternBits));
        }, transform.transformAllExpressionsWithPruning$default$2(), new MicroBatchExecution$$anonfun$3(this));
        Table sink = sink();
        if (sink instanceof Sink) {
            withNewBatchId = transformAllExpressionsWithPruning;
        } else {
            if (!(sink instanceof SupportsWrite)) {
                throw new IllegalArgumentException(new StringBuilder(22).append("unknown sink type for ").append(sink()).toString());
            }
            withNewBatchId = ((WriteToMicroBatchDataSource) transformAllExpressionsWithPruning).withNewBatchId(currentBatchId());
        }
        LogicalPlan logicalPlan = withNewBatchId;
        sparkSession.sparkContext().setLocalProperty(MicroBatchExecution$.MODULE$.BATCH_ID_KEY(), Long.toString(currentBatchId()));
        sparkSession.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), Boolean.toString(false));
        reportTimeTaken("queryPlanning", () -> {
            this.lastExecution_$eq(new IncrementalExecution(sparkSession, logicalPlan, this.outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
            return this.lastExecution().executedPlan();
        });
        Dataset dataset = new Dataset(lastExecution(), RowEncoder$.MODULE$.apply(lastExecution().analyzed().schema()));
        Option option = (Option) reportTimeTaken("addBatch", () -> {
            return (Option) SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), () -> {
                Object collect;
                Table sink2 = this.sink();
                if (sink2 instanceof Sink) {
                    ((Sink) sink2).addBatch(this.currentBatchId(), dataset);
                    collect = BoxedUnit.UNIT;
                } else {
                    if (!(sink2 instanceof SupportsWrite)) {
                        throw new MatchError(sink2);
                    }
                    collect = dataset.collect();
                }
                SparkPlan executedPlan = this.lastExecution().executedPlan();
                return executedPlan instanceof WriteToDataSourceV2Exec ? ((WriteToDataSourceV2Exec) executedPlan).commitProgress() : None$.MODULE$;
            });
        });
        withProgressLocked(() -> {
            this.sinkCommitProgress_$eq(option);
            this.watermarkTracker().updateWatermark(this.lastExecution().executedPlan());
            Predef$.MODULE$.assert(this.commitLog().add(this.currentBatchId(), new CommitMetadata(this.watermarkTracker().currentWatermark())), () -> {
                return new StringBuilder(74).append("Concurrent update to the commit log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString();
            });
            this.committedOffsets_$eq(this.committedOffsets().m2307$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) this.availableOffsets()));
        });
        logDebug(() -> {
            return new StringBuilder(16).append("Completed batch ").append(this.currentBatchId()).toString();
        });
    }

    public <T> T withProgressLocked(Function0<T> function0) {
        awaitProgressLock().lock();
        try {
            return (T) function0.apply();
        } finally {
            awaitProgressLock().unlock();
        }
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$3(MicroBatchExecution microBatchExecution, SparkSession sparkSession, OffsetSeqMetadata offsetSeqMetadata) {
        OffsetSeqMetadata$.MODULE$.setSessionConf(offsetSeqMetadata, sparkSession.conf());
        microBatchExecution.offsetSeqMetadata_$eq(OffsetSeqMetadata$.MODULE$.apply(offsetSeqMetadata.batchWatermarkMs(), offsetSeqMetadata.batchTimestampMs(), sparkSession.conf()));
        microBatchExecution.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
        microBatchExecution.watermarkTracker().setWatermark(offsetSeqMetadata.batchWatermarkMs());
    }

    public static final /* synthetic */ boolean $anonfun$populateStartOffsets$8(Offset offset, Offset offset2) {
        return offset2 != null ? offset2.equals(offset) : offset == null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$2(org.apache.spark.sql.connector.read.streaming.Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$1(MicroBatchExecution microBatchExecution, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
        return BoxesRunTime.unboxToBoolean(microBatchExecution.committedOffsets().get(sparkDataStream).map(offset2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$2(offset, offset2));
        }).getOrElse(() -> {
            return true;
        }));
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$7(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Option) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$9(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Option) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$11(MicroBatchExecution microBatchExecution, IncrementalExecution incrementalExecution) {
        return incrementalExecution.shouldRunAnotherBatch(microBatchExecution.offsetSeqMetadata());
    }

    public static final /* synthetic */ void $anonfun$constructNextBatch$16(Tuple2 tuple2) {
        if (tuple2 != null) {
            SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
            if (sparkDataStream instanceof Source) {
                Source source = (Source) sparkDataStream;
                if (offset instanceof Offset) {
                    source.commit((Offset) offset);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset offset2 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
            if (microBatchStream instanceof MicroBatchStream) {
                MicroBatchStream microBatchStream2 = microBatchStream;
                microBatchStream2.commit(microBatchStream2.deserializeOffset(offset2.json()));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException(new StringBuilder(47).append("Unknown source is found at constructNextBatch: ").append((SparkDataStream) tuple2._1()).toString());
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$4(Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$9(org.apache.spark.sql.connector.read.streaming.Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$14(TreePatternBits treePatternBits) {
        return treePatternBits.containsPattern(TreePattern$.MODULE$.CURRENT_LIKE());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MicroBatchExecution(SparkSession sparkSession, Trigger trigger, Clock clock, Map<String, String> map, WriteToStream writeToStream) {
        super(sparkSession, writeToStream.name(), writeToStream.resolvedCheckpointLocation(), writeToStream.inputQuery(), writeToStream.sink(), trigger, clock, writeToStream.outputMode(), writeToStream.deleteCheckpointOnStop());
        Serializable multiBatchExecutor;
        this.extraOptions = map;
        this.plan = writeToStream;
        this.sources = Nil$.MODULE$;
        Trigger trigger2 = super.trigger();
        if (trigger2 instanceof ProcessingTimeTrigger) {
            multiBatchExecutor = new ProcessingTimeExecutor((ProcessingTimeTrigger) trigger2, super.triggerClock());
        } else if (OneTimeTrigger$.MODULE$.equals(trigger2)) {
            multiBatchExecutor = new SingleBatchExecutor();
        } else {
            if (!AvailableNowTrigger$.MODULE$.equals(trigger2)) {
                throw new IllegalStateException(new StringBuilder(25).append("Unknown type of trigger: ").append(super.trigger()).toString());
            }
            multiBatchExecutor = new MultiBatchExecutor();
        }
        this.triggerExecutor = multiBatchExecutor;
        this.isCurrentBatchConstructed = false;
    }
}
