/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.Alias$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp;
import org.apache.spark.sql.catalyst.expressions.CurrentDate;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp;
import org.apache.spark.sql.catalyst.expressions.ExprId;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.FileSourceMetadataAttribute$;
import org.apache.spark.sql.catalyst.expressions.LocalTimestamp;
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation;
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project;
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2;
import org.apache.spark.sql.catalyst.streaming.WriteToStream;
import org.apache.spark.sql.catalyst.trees.TreePattern$;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.DataSource;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec;
import org.apache.spark.sql.execution.streaming.AcceptsLatestSeenOffsetHandler$;
import org.apache.spark.sql.execution.streaming.AvailableNowDataStreamWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowMicroBatchStreamWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowSourceWrapper;
import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
import org.apache.spark.sql.execution.streaming.CommitMetadata;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.MicroBatchExecution$;
import org.apache.spark.sql.execution.streaming.MultiBatchExecutor;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.OffsetHolder;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeqMetadata;
import org.apache.spark.sql.execution.streaming.OffsetSeqMetadata$;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.execution.streaming.ProgressReporter;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.SingleBatchExecutor;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution$;
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation;
import org.apache.spark.sql.execution.streaming.StreamingRelation;
import org.apache.spark.sql.execution.streaming.TERMINATED$;
import org.apache.spark.sql.execution.streaming.TriggerExecutor;
import org.apache.spark.sql.execution.streaming.WatermarkTracker;
import org.apache.spark.sql.execution.streaming.WatermarkTracker$;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.streaming.StreamingQueryStatus;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenIterable;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.GenericTraversableTemplate;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005]h\u0001B\u0012%\u0001EB\u0011B\u000e\u0001\u0003\u0002\u0003\u0006IaN\u001e\t\u0013q\u0002!\u0011!Q\u0001\nu\u0012\u0005\"C\"\u0001\u0005\u0003\u0005\u000b\u0011\u0002#K\u0011!Y\u0005A!A!\u0002\u0013a\u0005\u0002\u0003/\u0001\u0005\u0003\u0005\u000b\u0011B/\t\u000b\u0011\u0004A\u0011A3\t\u000f1\u0004\u0001\u0019!C\t[\"I\u0011\u0011\u0001\u0001A\u0002\u0013E\u00111\u0001\u0005\b\u0003#\u0001\u0001\u0015)\u0003o\u0011%\tY\u0002\u0001b\u0001\n\u0013\ti\u0002\u0003\u0005\u00028\u0001\u0001\u000b\u0011BA\u0010\u0011-\tI\u0004\u0001a\u0001\u0002\u0004%I!a\u000f\t\u0017\u0005\r\u0003\u00011AA\u0002\u0013%\u0011Q\t\u0005\f\u0003\u0013\u0002\u0001\u0019!A!B\u0013\ti\u0004\u0003\u0006\u0002L\u0001A)\u0019!C!\u0003\u001bB\u0011\"a\u0018\u0001\u0001\u0004%I!!\u0019\t\u0013\u0005%\u0004\u00011A\u0005\n\u0005-\u0004\u0002CA8\u0001\u0001\u0006K!a\u0019\t\u000f\u0005E\u0004\u0001\"\u0011\u0002t!9\u0011Q\u000f\u0001\u0005R\u0005M\u0004bBA<\u0001\u0011E\u0011\u0011\u0010\u0005\b\u0003\u007f\u0002A\u0011BAA\u0011\u001d\t9\t\u0001C\u0005\u0003CBq!!#\u0001\t\u0013\tY\tC\u0004\u0002\u0018\u0002!I!!'\t\u000f\u0005}\u0005\u0001\"\u0003\u0002\"\"A\u0011q\u0015\u0001\u0005\u0002!\nI\u000bC\u0007\u0002N\u0002\u0001\n1!A\u0001\n\u0013\tym\u000f\u0005\u000e\u0003#\u0004\u0001\u0013aA\u0001\u0002\u0013%\u00111\u001b&\b\u000f\u0005UG\u0005#\u0001\u0002X\u001a11\u0005\nE\u0001\u00033Da\u0001Z\u0010\u0005\u0002\u0005\u0005\b\"CAr?\t\u0007I\u0011AAs\u0011!\t)p\bQ\u0001\n\u0005\u001d(aE'jGJ|')\u0019;dQ\u0016CXmY;uS>t'BA\u0013'\u0003%\u0019HO]3b[&twM\u0003\u0002(Q\u0005IQ\r_3dkRLwN\u001c\u0006\u0003S)\n1a]9m\u0015\tYC&A\u0003ta\u0006\u00148N\u0003\u0002.]\u00051\u0011\r]1dQ\u0016T\u0011aL\u0001\u0004_J<7\u0001A\n\u0003\u0001I\u0002\"a\r\u001b\u000e\u0003\u0011J!!\u000e\u0013\u0003\u001fM#(/Z1n\u000bb,7-\u001e;j_:\fAb\u001d9be.\u001cVm]:j_:\u0004\"\u0001O\u001d\u000e\u0003!J!A\u000f\u0015\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\n\u0005Y\"\u0014a\u0002;sS\u001e<WM\u001d\t\u0003}\u0001k\u0011a\u0010\u0006\u0003K!J!!Q \u0003\u000fQ\u0013\u0018nZ4fe&\u0011A\bN\u0001\riJLwmZ3s\u00072|7m\u001b\t\u0003\u000b\"k\u0011A\u0012\u0006\u0003\u000f*\nA!\u001e;jY&\u0011\u0011J\u0012\u0002\u0006\u00072|7m[\u0005\u0003\u0007R\nA\"\u001a=ue\u0006|\u0005\u000f^5p]N\u0004B!\u0014,Z3:\u0011a\n\u0016\t\u0003\u001fJk\u0011\u0001\u0015\u0006\u0003#B\na\u0001\u0010:p_Rt$\"A*\u0002\u000bM\u001c\u0017\r\\1\n\u0005U\u0013\u0016A\u0002)sK\u0012,g-\u0003\u0002X1\n\u0019Q*\u00199\u000b\u0005U\u0013\u0006CA'[\u0013\tY\u0006L\u0001\u0004TiJLgnZ\u0001\u0005a2\fg\u000e\u0005\u0002_E6\tqL\u0003\u0002&A*\u0011\u0011\rK\u0001\tG\u0006$\u0018\r\\=ti&\u00111m\u0018\u0002\u000e/JLG/\u001a+p'R\u0014X-Y7\u0002\rqJg.\u001b;?)\u00191w\r[5kWB\u00111\u0007\u0001\u0005\u0006m\u0019\u0001\ra\u000e\u0005\u0006y\u0019\u0001\r!\u0010\u0005\u0006\u0007\u001a\u0001\r\u0001\u0012\u0005\u0006\u0017\u001a\u0001\r\u0001\u0014\u0005\u00069\u001a\u0001\r!X\u0001\bg>,(oY3t+\u0005q\u0007cA8uo:\u0011\u0001O\u001d\b\u0003\u001fFL\u0011aU\u0005\u0003gJ\u000bq\u0001]1dW\u0006<W-\u0003\u0002vm\n\u00191+Z9\u000b\u0005M\u0014\u0006C\u0001=\u007f\u001b\u0005I(BA\u0013{\u0015\tYH0\u0001\u0003sK\u0006$'BA?)\u0003%\u0019wN\u001c8fGR|'/\u0003\u0002\u0000s\ny1\u000b]1sW\u0012\u000bG/Y*ue\u0016\fW.A\u0006t_V\u00148-Z:`I\u0015\fH\u0003BA\u0003\u0003\u001b\u0001B!a\u0002\u0002\n5\t!+C\u0002\u0002\fI\u0013A!\u00168ji\"A\u0011q\u0002\u0005\u0002\u0002\u0003\u0007a.A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u0013\u0005U\u0001\u0003BA\u0004\u0003/I1!!\u0007S\u0005!1x\u000e\\1uS2,\u0017a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005}!\u0003CA\u0011\u0003K\tY#!\r\u0007\r\u0005\r\u0002\u0001AA\u0010\u00051a$/\u001a4j]\u0016lWM\u001c;?!\u0011\t9!a\n\n\u0007\u0005%\"KA\u0004Qe>$Wo\u0019;\u0011\t\u0005\u001d\u0011QF\u0005\u0004\u0003_\u0011&\u0001D*fe&\fG.\u001b>bE2,\u0007cA\u001a\u00024%\u0019\u0011Q\u0007\u0013\u0003\u001fQ\u0013\u0018nZ4fe\u0016CXmY;u_J\f\u0001\u0003\u001e:jO\u001e,'/\u0012=fGV$xN\u001d\u0011\u0002!]\fG/\u001a:nCJ\\GK]1dW\u0016\u0014XCAA\u001f!\r\u0019\u0014qH\u0005\u0004\u0003\u0003\"#\u0001E,bi\u0016\u0014X.\u0019:l)J\f7m[3s\u0003Q9\u0018\r^3s[\u0006\u00148\u000e\u0016:bG.,'o\u0018\u0013fcR!\u0011QAA$\u0011%\ty!DA\u0001\u0002\u0004\ti$A\txCR,'/\\1sWR\u0013\u0018mY6fe\u0002\n1\u0002\\8hS\u000e\fG\u000e\u00157b]V\u0011\u0011q\n\t\u0005\u0003#\nY&\u0004\u0002\u0002T)!\u0011QKA,\u0003\u001dawnZ5dC2T1!!\u0017a\u0003\u0015\u0001H.\u00198t\u0013\u0011\ti&a\u0015\u0003\u00171{w-[2bYBc\u0017M\\\u0001\u001aSN\u001cUO\u001d:f]R\u0014\u0015\r^2i\u0007>t7\u000f\u001e:vGR,G-\u0006\u0002\u0002dA!\u0011qAA3\u0013\r\t9G\u0015\u0002\b\u0005>|G.Z1o\u0003uI7oQ;se\u0016tGOQ1uG\"\u001cuN\\:ueV\u001cG/\u001a3`I\u0015\fH\u0003BA\u0003\u0003[B\u0011\"a\u0004\u0012\u0003\u0003\u0005\r!a\u0019\u00025%\u001c8)\u001e:sK:$()\u0019;dQ\u000e{gn\u001d;sk\u000e$X\r\u001a\u0011\u0002\tM$x\u000e\u001d\u000b\u0003\u0003\u000b\tAb\u001d;beR$&/[4hKJ\f!C];o\u0003\u000e$\u0018N^1uK\u0012\u001cFO]3b[R!\u0011QAA>\u0011\u0019\ti(\u0006a\u0001o\u0005)2\u000f]1sWN+7o]5p]\u001a{'o\u0015;sK\u0006l\u0017\u0001\u00069paVd\u0017\r^3Ti\u0006\u0014Ho\u00144gg\u0016$8\u000f\u0006\u0003\u0002\u0006\u0005\r\u0005BBAC-\u0001\u0007q'\u0001\rta\u0006\u00148nU3tg&|g\u000eV8Sk:\u0014\u0015\r^2iKN\f!#[:OK^$\u0015\r^1Bm\u0006LG.\u00192mK\u0006qq-\u001a;Ti\u0006\u0014Ho\u00144gg\u0016$H\u0003BAG\u0003'\u00032\u0001_AH\u0013\r\t\t*\u001f\u0002\u0007\u001f\u001a47/\u001a;\t\r\u0005U\u0005\u00041\u0001x\u0003)!\u0017\r^1TiJ,\u0017-\\\u0001\u0013G>t7\u000f\u001e:vGRtU\r\u001f;CCR\u001c\u0007\u000e\u0006\u0003\u0002d\u0005m\u0005bBAO3\u0001\u0007\u00111M\u0001\u0015]>$\u0015\r^1CCR\u001c\u0007.Z:F]\u0006\u0014G.\u001a3\u0002\u0011I,hNQ1uG\"$B!!\u0002\u0002$\"1\u0011Q\u0015\u000eA\u0002]\nac\u001d9be.\u001cVm]:j_:$vNU;o\u0005\u0006$8\r[\u0001\u0013o&$\b\u000e\u0015:pOJ,7o\u001d'pG.,G-\u0006\u0003\u0002,\u0006EF\u0003BAW\u0003\u0007\u0004B!a,\u000222\u0001AaBAZ7\t\u0007\u0011Q\u0017\u0002\u0002)F!\u0011qWA_!\u0011\t9!!/\n\u0007\u0005m&KA\u0004O_RD\u0017N\\4\u0011\t\u0005\u001d\u0011qX\u0005\u0004\u0003\u0003\u0014&aA!os\"A\u0011QY\u000e\u0005\u0002\u0004\t9-A\u0001g!\u0019\t9!!3\u0002.&\u0019\u00111\u001a*\u0003\u0011q\u0012\u0017P\\1nKz\n!c];qKJ$3\u000f]1sWN+7o]5p]V\tq'\u0001\ntkB,'\u000f\n;sS\u001e<WM]\"m_\u000e\\W#\u0001#\u0002'5K7M]8CCR\u001c\u0007.\u0012=fGV$\u0018n\u001c8\u0011\u0005Mz2cA\u0010\u0002\\B!\u0011qAAo\u0013\r\tyN\u0015\u0002\u0007\u0003:L(+\u001a4\u0015\u0005\u0005]\u0017\u0001\u0004\"B)\u000eCu,\u0013#`\u0017\u0016KVCAAt!\u0011\tI/a=\u000e\u0005\u0005-(\u0002BAw\u0003_\fA\u0001\\1oO*\u0011\u0011\u0011_\u0001\u0005U\u00064\u0018-C\u0002\\\u0003W\fQBQ!U\u0007\"{\u0016\nR0L\u000bf\u0003\u0003")
public class MicroBatchExecution
extends StreamExecution {
    private LogicalPlan logicalPlan;
    private scala.collection.immutable.Map<String, String> extraOptions;
    private final WriteToStream plan;
    private volatile Seq<SparkDataStream> sources;
    private final Product triggerExecutor;
    private WatermarkTracker watermarkTracker;
    private boolean isCurrentBatchConstructed;
    private volatile boolean bitmap$0;

    public static String BATCH_ID_KEY() {
        return MicroBatchExecution$.MODULE$.BATCH_ID_KEY();
    }

    public /* synthetic */ SparkSession org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession() {
        return super.sparkSession();
    }

    private /* synthetic */ Clock super$triggerClock() {
        return super.triggerClock();
    }

    @Override
    public Seq<SparkDataStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<SparkDataStream> x$1) {
        this.sources = x$1;
    }

    private Product triggerExecutor() {
        return this.triggerExecutor;
    }

    private WatermarkTracker watermarkTracker() {
        return this.watermarkTracker;
    }

    private void watermarkTracker_$eq(WatermarkTracker x$1) {
        this.watermarkTracker = x$1;
    }

    private LogicalPlan logicalPlan$lzycompute() {
        MicroBatchExecution microBatchExecution = this;
        synchronized (microBatchExecution) {
            if (!this.bitmap$0) {
                LogicalPlan logicalPlan2;
                Predef$.MODULE$.assert(this.queryExecutionThread() == Thread.currentThread(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(83).append("logicalPlan must be initialized in QueryExecutionThread ").append("but the current thread was ").append(Thread.currentThread()).toString());
                LongRef nextSourceId = LongRef.create((long)0L);
                Map toExecutionRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Map v2ToExecutionRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Map v2ToRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
                Seq disabledSources = Utils$.MODULE$.stringToSeq(super.sparkSession().sqlContext().conf().disabledV2StreamingMicroBatchReaders());
                LogicalPlan _logicalPlan = (LogicalPlan)this.analyzedPlan().transform((PartialFunction)new scala.Serializable(this, toExecutionRelationMap, nextSourceId, disabledSources, v2ToRelationMap, v2ToExecutionRelationMap){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ MicroBatchExecution $outer;
                    private final Map toExecutionRelationMap$1;
                    private final LongRef nextSourceId$1;
                    private final Seq disabledSources$1;
                    private final Map v2ToRelationMap$1;
                    private final Map v2ToExecutionRelationMap$1;

                    public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                        A1 A1 = x1;
                        if (A1 instanceof StreamingRelation) {
                            StreamingRelation streamingRelation = (StreamingRelation)A1;
                            DataSource dataSourceV1 = streamingRelation.dataSource();
                            String sourceName = streamingRelation.sourceName();
                            Seq<Attribute> output = streamingRelation.output();
                            return (B1)this.toExecutionRelationMap$1.getOrElseUpdate((Object)((Object)streamingRelation), (Function0 & Serializable & scala.Serializable)() -> {
                                String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                Source source = dataSourceV1.createSource(metadataPath);
                                ++$this.nextSourceId$1.elem;
                                $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Using Source [").append(source).append("] from DataSourceV1 named '").append(sourceName).append("' [").append(dataSourceV1).append("]").toString());
                                return new StreamingExecutionRelation(source, output, $this.$outer.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession());
                            });
                        }
                        if (A1 instanceof StreamingRelationV2) {
                            StreamingRelationV2 streamingRelationV2 = (StreamingRelationV2)A1;
                            Option src = streamingRelationV2.source();
                            String srcName = streamingRelationV2.sourceName();
                            Table table = streamingRelationV2.table();
                            CaseInsensitiveStringMap options = streamingRelationV2.extraOptions();
                            Seq output = streamingRelationV2.output();
                            Option v1 = streamingRelationV2.v1Relation();
                            if (table instanceof SupportsRead) {
                                SupportsRead supportsRead = (SupportsRead)table;
                                String dsStr = src.nonEmpty() ? new StringBuilder(2).append("[").append(src.get()).append("]").toString() : "";
                                boolean v2Disabled = this.disabledSources$1.contains((Object)src.getOrElse((Function0 & Serializable & scala.Serializable)() -> None$.MODULE$).getClass().getCanonicalName());
                                if (!v2Disabled && DataSourceV2Implicits$.MODULE$.TableHelper((Table)supportsRead).supports(TableCapability.MICRO_BATCH_READ)) {
                                    return (B1)this.v2ToRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable & scala.Serializable)() -> {
                                        String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                        ++$this.nextSourceId$1.elem;
                                        $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Reading table [").append(supportsRead).append("] from DataSourceV2 named '").append(srcName).append("' ").append(dsStr).toString());
                                        Scan scan = supportsRead.newScanBuilder(options).build();
                                        MicroBatchStream stream = scan.toMicroBatchStream(metadataPath);
                                        return new StreamingDataSourceV2Relation(output, scan, (SparkDataStream)stream, StreamingDataSourceV2Relation$.MODULE$.apply$default$4(), StreamingDataSourceV2Relation$.MODULE$.apply$default$5());
                                    });
                                }
                                if (v1.isEmpty()) {
                                    throw QueryExecutionErrors$.MODULE$.microBatchUnsupportedByDataSourceError(srcName);
                                }
                                return (B1)this.v2ToExecutionRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable & scala.Serializable)() -> {
                                    String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                                    Source source = ((StreamingRelation)((Object)((Object)v1.get()))).dataSource().createSource(metadataPath);
                                    ++$this.nextSourceId$1.elem;
                                    $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Using Source [").append(source).append("] from DataSourceV2 named '").append(srcName).append("' ").append(dsStr).toString());
                                    return new StreamingExecutionRelation(source, (Seq<Attribute>)output, $this.$outer.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession());
                                });
                            }
                        }
                        return (B1)function1.apply(x1);
                    }

                    public final boolean isDefinedAt(LogicalPlan x1) {
                        StreamingRelationV2 streamingRelationV2;
                        Table table;
                        LogicalPlan logicalPlan2 = x1;
                        if (logicalPlan2 instanceof StreamingRelation) {
                            return true;
                        }
                        return logicalPlan2 instanceof StreamingRelationV2 && (table = (streamingRelationV2 = (StreamingRelationV2)logicalPlan2).table()) instanceof SupportsRead;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.toExecutionRelationMap$1 = toExecutionRelationMap$1;
                        this.nextSourceId$1 = nextSourceId$1;
                        this.disabledSources$1 = disabledSources$1;
                        this.v2ToRelationMap$1 = v2ToRelationMap$1;
                        this.v2ToExecutionRelationMap$1 = v2ToExecutionRelationMap$1;
                    }

                    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                        return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 org.apache.spark.sql.execution.datasources.DataSource java.lang.String scala.collection.Seq ), $anonfun$applyOrElse$3(), $anonfun$applyOrElse$4(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String org.apache.spark.sql.util.CaseInsensitiveStringMap scala.collection.Seq ), $anonfun$applyOrElse$6(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1 scala.Option java.lang.String java.lang.String scala.collection.Seq ), $anonfun$applyOrElse$2(org.apache.spark.sql.execution.streaming.Source java.lang.String org.apache.spark.sql.execution.datasources.DataSource ), $anonfun$applyOrElse$5(org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String ), $anonfun$applyOrElse$7(org.apache.spark.sql.execution.streaming.Source java.lang.String java.lang.String )}, serializedLambda);
                    }
                });
                this.sources_$eq((Seq<SparkDataStream>)_logicalPlan.collect((PartialFunction)new scala.Serializable(null){
                    public static final long serialVersionUID = 0L;

                    public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                        A1 A1 = x2;
                        if (A1 instanceof StreamingExecutionRelation) {
                            StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                            return (B1)streamingExecutionRelation.source();
                        }
                        if (A1 instanceof StreamingDataSourceV2Relation) {
                            StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                            return (B1)streamingDataSourceV2Relation.stream();
                        }
                        return (B1)function1.apply(x2);
                    }

                    public final boolean isDefinedAt(LogicalPlan x2) {
                        LogicalPlan logicalPlan2 = x2;
                        if (logicalPlan2 instanceof StreamingExecutionRelation) {
                            return true;
                        }
                        return logicalPlan2 instanceof StreamingDataSourceV2Relation;
                    }
                }));
                Product product = this.triggerExecutor();
                this.uniqueSources_$eq((scala.collection.immutable.Map<SparkDataStream, ReadLimit>)(product instanceof SingleBatchExecutor ? ((TraversableOnce)((TraversableLike)this.sources().distinct()).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    SparkDataStream sparkDataStream = x0$1;
                    if (sparkDataStream instanceof SupportsAdmissionControl) {
                        ReadLimit limit;
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)sparkDataStream;
                        ReadLimit readLimit = limit = supportsAdmissionControl.getDefaultReadLimit();
                        ReadLimit readLimit2 = ReadLimit.allAvailable();
                        if (readLimit == null ? readLimit2 != null : !readLimit.equals(readLimit2)) {
                            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("The read limit ").append(limit).append(" for ").append(supportsAdmissionControl).append(" is ignored when Trigger.Once is used.").toString());
                        }
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)supportsAdmissionControl), (Object)ReadLimit.allAvailable());
                    }
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)sparkDataStream), (Object)ReadLimit.allAvailable());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()) : (product instanceof MultiBatchExecutor ? ((TraversableOnce)((TraversableLike)((TraversableLike)this.sources().distinct()).map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    SparkDataStream sparkDataStream = x0$2;
                    if (sparkDataStream instanceof SupportsTriggerAvailableNow) {
                        SupportsTriggerAvailableNow supportsTriggerAvailableNow = (SupportsTriggerAvailableNow)sparkDataStream;
                        return supportsTriggerAvailableNow;
                    }
                    if (sparkDataStream instanceof Source) {
                        Source source = (Source)sparkDataStream;
                        return new AvailableNowSourceWrapper(source);
                    }
                    if (sparkDataStream instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream = (MicroBatchStream)sparkDataStream;
                        return new AvailableNowMicroBatchStreamWrapper(microBatchStream);
                    }
                    throw new MatchError((Object)sparkDataStream);
                }, Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)s -> {
                    s.prepareForTriggerAvailableNow();
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(s), (Object)s.getDefaultReadLimit());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()) : ((TraversableOnce)((TraversableLike)this.sources().distinct()).map((Function1 & Serializable & scala.Serializable)x0$3 -> {
                    SparkDataStream sparkDataStream = x0$3;
                    if (sparkDataStream instanceof SupportsAdmissionControl) {
                        SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)sparkDataStream;
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)supportsAdmissionControl), (Object)supportsAdmissionControl.getDefaultReadLimit());
                    }
                    return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)sparkDataStream), (Object)ReadLimit.allAvailable());
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()))));
                Table table = this.sink();
                if (table instanceof SupportsWrite) {
                    SupportsWrite supportsWrite = (SupportsWrite)table;
                    Option relationOpt = this.plan.catalogAndIdent().map((Function1 & Serializable & scala.Serializable)x0$4 -> {
                        Tuple2 tuple2 = x0$4;
                        if (tuple2 != null) {
                            TableCatalog catalog = (TableCatalog)tuple2._1();
                            Identifier ident = (Identifier)tuple2._2();
                            return DataSourceV2Relation$.MODULE$.create((Table)supportsWrite, (Option)new Some((Object)catalog), (Option)new Some((Object)ident));
                        }
                        throw new MatchError((Object)tuple2);
                    });
                    logicalPlan2 = new WriteToMicroBatchDataSource((Option<DataSourceV2Relation>)relationOpt, supportsWrite, _logicalPlan, this.id().toString(), this.extraOptions, this.outputMode(), WriteToMicroBatchDataSource$.MODULE$.apply$default$7());
                } else {
                    logicalPlan2 = _logicalPlan;
                }
                this.logicalPlan = logicalPlan2;
                this.bitmap$0 = true;
            }
        }
        this.extraOptions = null;
        return this.logicalPlan;
    }

    @Override
    public LogicalPlan logicalPlan() {
        if (!this.bitmap$0) {
            return this.logicalPlan$lzycompute();
        }
        return this.logicalPlan;
    }

    private boolean isCurrentBatchConstructed() {
        return this.isCurrentBatchConstructed;
    }

    private void isCurrentBatchConstructed_$eq(boolean x$1) {
        this.isCurrentBatchConstructed = x$1;
    }

    @Override
    public void stop() {
        this.state().set(TERMINATED$.MODULE$);
        if (this.queryExecutionThread().isAlive()) {
            super.sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
            this.interruptAndAwaitExecutionThreadTermination();
            super.sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString());
    }

    @Override
    public void startTrigger() {
        ProgressReporter.startTrigger$(this);
        StreamingQueryStatus qual$1 = this.currentStatus();
        boolean x$1 = true;
        String x$2 = qual$1.copy$default$1();
        boolean x$3 = qual$1.copy$default$2();
        this.currentStatus_$eq(qual$1.copy(x$2, x$3, x$1));
    }

    @Override
    public void runActivatedStream(SparkSession sparkSessionForStream) {
        boolean noDataBatchesEnabled = sparkSessionForStream.sessionState().conf().streamingNoDataMicroBatchesEnabled();
        ((TriggerExecutor)this.triggerExecutor()).execute((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            if (this.isActive()) {
                BooleanRef currentBatchHasNewData = BooleanRef.create((boolean)false);
                this.startTrigger();
                this.reportTimeTaken("triggerExecution", (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    if (this.currentBatchId() < 0L) {
                        AcceptsLatestSeenOffsetHandler$.MODULE$.setLatestSeenOffsetOnSources((Option<OffsetSeq>)this.offsetLog().getLatest().map((Function1 & Serializable & scala.Serializable)x$1 -> (OffsetSeq)x$1._2()), this.sources());
                        this.populateStartOffsets(sparkSessionForStream);
                        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(20).append("Stream started from ").append(this.committedOffsets()).toString());
                    }
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDescription(this.getBatchDescriptionString());
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDoAsUser();
                    if (!this.isCurrentBatchConstructed()) {
                        this.isCurrentBatchConstructed_$eq(this.constructNextBatch(noDataBatchesEnabled));
                    }
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets(), this.latestOffsets());
                    currentBatchHasNewData$1.elem = this.isNewDataAvailable();
                    StreamingQueryStatus qual$1 = this.currentStatus();
                    boolean x$12 = this.isNewDataAvailable();
                    String x$2 = qual$1.copy$default$1();
                    boolean x$3 = qual$1.copy$default$3();
                    this.currentStatus_$eq(qual$1.copy(x$2, x$12, x$3));
                    if (this.isCurrentBatchConstructed()) {
                        if (currentBatchHasNewData$1.elem) {
                            this.updateStatusMessage("Processing new data");
                        } else {
                            this.updateStatusMessage("No new data but cleaning up state");
                        }
                        this.runBatch(sparkSessionForStream);
                        return;
                    }
                    this.updateStatusMessage("Waiting for data to arrive");
                });
                this.finishTrigger(currentBatchHasNewData.elem, this.isCurrentBatchConstructed());
                this.withProgressLocked((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.awaitProgressLockCondition().signalAll());
                if (this.isCurrentBatchConstructed()) {
                    this.currentBatchId_$eq(this.currentBatchId() + 1L);
                    this.isCurrentBatchConstructed_$eq(false);
                } else if (this.triggerExecutor() instanceof MultiBatchExecutor) {
                    this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Finished processing all available data for the trigger, terminating this Trigger.AvailableNow query");
                    this.state().set(TERMINATED$.MODULE$);
                } else {
                    Thread.sleep(this.pollingDelayMs());
                }
            }
            this.updateStatusMessage("Waiting for next trigger");
            return this.isActive();
        });
    }

    private void populateStartOffsets(SparkSession sparkSessionToRunBatches) {
        Some some;
        Tuple2 tuple2;
        this.sinkCommitProgress_$eq((Option<StreamWriterCommitProgress>)None$.MODULE$);
        Option option = this.offsetLog().getLatest();
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            Some some2;
            Tuple2 tuple22;
            long latestBatchId = tuple2._1$mcJ$sp();
            OffsetSeq nextOffsets = (OffsetSeq)tuple2._2();
            this.currentBatchId_$eq(latestBatchId);
            this.isCurrentBatchConstructed_$eq(true);
            this.availableOffsets_$eq(nextOffsets.toStreamProgress(this.sources()));
            if (latestBatchId != 0L) {
                OffsetSeq secondLatestOffsets = (OffsetSeq)this.offsetLog().get(latestBatchId - 1L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                    this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(366).append("The offset log for batch ").append(latestBatchId - 1L).append(" doesn't exist, ").append("which is required to restart the query from the latest batch ").append(latestBatchId).append(" ").append("from the offset log. Please ensure there are two subsequent offset logs ").append("available for the latest batch via manually deleting the offset file(s). ").append("Please also ensure the latest batch for commit log is equal or one batch ").append("earlier than the latest batch for offset log.").toString());
                    throw new IllegalStateException(new StringBuilder(20).append("batch ").append(latestBatchId - 1L).append(" doesn't exist").toString());
                });
                this.committedOffsets_$eq(secondLatestOffsets.toStreamProgress(this.sources()));
            }
            nextOffsets.metadata().foreach((Function1 & Serializable & scala.Serializable)metadata -> {
                MicroBatchExecution.$anonfun$populateStartOffsets$3(this, sparkSessionToRunBatches, metadata);
                return BoxedUnit.UNIT;
            });
            Option option2 = this.commitLog().getLatest();
            if (option2 instanceof Some && (tuple22 = (Tuple2)(some2 = (Some)option2).value()) != null) {
                long latestCommittedBatchId = tuple22._1$mcJ$sp();
                CommitMetadata commitMetadata = (CommitMetadata)tuple22._2();
                if (latestBatchId == latestCommittedBatchId) {
                    this.availableOffsets().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                        Tuple2 tuple2 = x0$1;
                        if (tuple2 != null) {
                            SparkDataStream source = (SparkDataStream)tuple2._1();
                            org.apache.spark.sql.connector.read.streaming.Offset end = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                            if (source instanceof Source) {
                                Source source2 = (Source)source;
                                if (end instanceof Offset) {
                                    Offset offset = (Offset)end;
                                    Option start = this.committedOffsets().get(source2).map((Function1 & Serializable & scala.Serializable)x$2 -> (Offset)((Object)((Object)x$2)));
                                    return source2.getBatch((Option<Offset>)start, offset);
                                }
                            }
                        }
                        return BoxedUnit.UNIT;
                    });
                    this.currentBatchId_$eq(latestCommittedBatchId + 1L);
                    this.isCurrentBatchConstructed_$eq(false);
                    this.committedOffsets_$eq(this.committedOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>)this.availableOffsets()));
                    this.watermarkTracker().setWatermark(package$.MODULE$.max(this.watermarkTracker().currentWatermark(), commitMetadata.nextBatchWatermarkMs()));
                } else if (latestCommittedBatchId == latestBatchId - 1L) {
                    this.availableOffsets().foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                        Tuple2 tuple2 = x0$2;
                        if (tuple2 != null) {
                            SparkDataStream source = (SparkDataStream)tuple2._1();
                            org.apache.spark.sql.connector.read.streaming.Offset end = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                            if (source instanceof Source) {
                                Source source2 = (Source)source;
                                if (end instanceof Offset) {
                                    Offset offset = (Offset)end;
                                    Option start = this.committedOffsets().get(source2).map((Function1 & Serializable & scala.Serializable)x$3 -> (Offset)((Object)((Object)x$3)));
                                    if (BoxesRunTime.unboxToBoolean((Object)start.map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$populateStartOffsets$8(offset, x$4))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> true))) {
                                        return source2.getBatch((Option<Offset>)start, offset);
                                    }
                                    return BoxedUnit.UNIT;
                                }
                            }
                        }
                        return BoxedUnit.UNIT;
                    });
                } else if (latestCommittedBatchId < latestBatchId - 1L) {
                    this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(79).append("Batch completion log latest batch id is ").append(latestCommittedBatchId).append(", which is not trailing ").append("batchid ").append(latestBatchId).append(" by one").toString());
                }
            } else if (None$.MODULE$.equals(option2)) {
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "no commit log present");
            } else {
                throw new MatchError(option2);
            }
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(65).append("Resuming at batch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).append(" and available offsets ").append(this.availableOffsets()).toString());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting new streaming query.");
            this.currentBatchId_$eq(0L);
            this.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSessionToRunBatches.conf()));
            return;
        }
        throw new MatchError(option);
    }

    private boolean isNewDataAvailable() {
        return this.availableOffsets().exists((Function1<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>, Object>)(Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$isNewDataAvailable$1(this, x0$1)));
    }

    private org.apache.spark.sql.connector.read.streaming.Offset getStartOffset(SparkDataStream dataStream) {
        Option<org.apache.spark.sql.connector.read.streaming.Offset> startOffsetOpt = this.availableOffsets().get(dataStream);
        SparkDataStream sparkDataStream = dataStream;
        if (sparkDataStream instanceof Source) {
            return (org.apache.spark.sql.connector.read.streaming.Offset)startOffsetOpt.orNull(Predef$.MODULE$.$conforms());
        }
        if (sparkDataStream instanceof MicroBatchStream) {
            MicroBatchStream microBatchStream = (MicroBatchStream)sparkDataStream;
            return (org.apache.spark.sql.connector.read.streaming.Offset)startOffsetOpt.map((Function1 & Serializable & scala.Serializable)offset -> microBatchStream.deserializeOffset(offset.json())).getOrElse((Function0 & Serializable & scala.Serializable)() -> microBatchStream.initialOffset());
        }
        throw new MatchError((Object)sparkDataStream);
    }

    private boolean constructNextBatch(boolean noDataBatchesEnabled) {
        boolean bl;
        Object object = new Object();
        try {
            bl = BoxesRunTime.unboxToBoolean(this.withProgressLocked((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                if (this.isCurrentBatchConstructed()) {
                    throw new NonLocalReturnControl.mcZ.sp(object, true);
                }
                Tuple2 tuple2 = ((GenericTraversableTemplate)this.uniqueSources().toSeq().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    SparkDataStream s;
                    SparkDataStream s2;
                    Tuple2 tuple2 = x0$1;
                    if (tuple2 != null) {
                        SparkDataStream s3 = (SparkDataStream)tuple2._1();
                        ReadLimit limit = (ReadLimit)tuple2._2();
                        if (s3 instanceof AvailableNowDataStreamWrapper) {
                            AvailableNowDataStreamWrapper availableNowDataStreamWrapper = (AvailableNowDataStreamWrapper)s3;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(availableNowDataStreamWrapper).toString());
                            SparkDataStream originalSource = availableNowDataStreamWrapper.delegate();
                            return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable & scala.Serializable)() -> {
                                org.apache.spark.sql.connector.read.streaming.Offset next = availableNowDataStreamWrapper.latestOffset(this.getStartOffset(originalSource), limit);
                                org.apache.spark.sql.connector.read.streaming.Offset latest = availableNowDataStreamWrapper.reportLatestOffset();
                                return new Tuple2((Object)new Tuple2((Object)originalSource, (Object)Option$.MODULE$.apply((Object)next)), (Object)new Tuple2((Object)originalSource, (Object)Option$.MODULE$.apply((Object)latest)));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        SparkDataStream s4 = (SparkDataStream)tuple2._1();
                        ReadLimit limit = (ReadLimit)tuple2._2();
                        if (s4 instanceof SupportsAdmissionControl) {
                            SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl)s4;
                            this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(supportsAdmissionControl).toString());
                            return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable & scala.Serializable)() -> {
                                org.apache.spark.sql.connector.read.streaming.Offset next = supportsAdmissionControl.latestOffset(this.getStartOffset((SparkDataStream)supportsAdmissionControl), limit);
                                org.apache.spark.sql.connector.read.streaming.Offset latest = supportsAdmissionControl.reportLatestOffset();
                                return new Tuple2((Object)new Tuple2((Object)supportsAdmissionControl, (Object)Option$.MODULE$.apply((Object)next)), (Object)new Tuple2((Object)supportsAdmissionControl, (Object)Option$.MODULE$.apply((Object)latest)));
                            });
                        }
                    }
                    if (tuple2 != null && (s2 = (SparkDataStream)tuple2._1()) instanceof Source) {
                        Source source = (Source)s2;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(source).toString());
                        return (Tuple2)this.reportTimeTaken("getOffset", (Function0 & Serializable & scala.Serializable)() -> {
                            Option<Offset> offset = source.getOffset();
                            return new Tuple2((Object)new Tuple2((Object)source, offset), (Object)new Tuple2((Object)source, offset));
                        });
                    }
                    if (tuple2 != null && (s = (SparkDataStream)tuple2._1()) instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream = (MicroBatchStream)s;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(microBatchStream).toString());
                        return (Tuple2)this.reportTimeTaken("latestOffset", (Function0 & Serializable & scala.Serializable)() -> {
                            org.apache.spark.sql.connector.read.streaming.Offset latest = microBatchStream.latestOffset();
                            return new Tuple2((Object)new Tuple2((Object)microBatchStream, (Object)Option$.MODULE$.apply((Object)latest)), (Object)new Tuple2((Object)microBatchStream, (Object)Option$.MODULE$.apply((Object)latest)));
                        });
                    }
                    if (tuple2 != null) {
                        SparkDataStream s5 = (SparkDataStream)tuple2._1();
                        throw new IllegalStateException(new StringBuilder(19).append("Unexpected source: ").append(s5).toString());
                    }
                    throw new MatchError((Object)tuple2);
                }, Seq$.MODULE$.canBuildFrom())).unzip((Function1)Predef$.MODULE$.$conforms());
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                Seq nextOffsets = (Seq)tuple2._1();
                Seq recentOffsets = (Seq)tuple2._2();
                Tuple2 tuple22 = new Tuple2((Object)nextOffsets, (Object)recentOffsets);
                Seq nextOffsets2 = (Seq)tuple22._1();
                Seq recentOffsets2 = (Seq)tuple22._2();
                this.availableOffsets_$eq(this.availableOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>)((TraversableOnce)((TraversableLike)nextOffsets2.filter((Function1 & Serializable & scala.Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$constructNextBatch$7(x0$2)))).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p._1()), ((Option)p._2()).get()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
                this.latestOffsets_$eq(this.latestOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>)((TraversableOnce)((TraversableLike)recentOffsets2.filter((Function1 & Serializable & scala.Serializable)x0$3 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$constructNextBatch$9(x0$3)))).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p._1()), ((Option)p._2()).get()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
                OffsetSeqMetadata qual$1 = this.offsetSeqMetadata();
                long x$1 = this.watermarkTracker().currentWatermark();
                long x$2 = this.super$triggerClock().getTimeMillis();
                scala.collection.immutable.Map<String, String> x$3 = qual$1.copy$default$3();
                this.offsetSeqMetadata_$eq(qual$1.copy(x$1, x$2, x$3));
                boolean lastExecutionRequiresAnotherBatch = noDataBatchesEnabled && Option$.MODULE$.apply((Object)this.lastExecution()).exists((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)x$6.shouldRunAnotherBatch(this.offsetSeqMetadata())));
                boolean shouldConstructNextBatch = this.isNewDataAvailable() || lastExecutionRequiresAnotherBatch;
                this.logTrace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(113).append("noDataBatchesEnabled = ").append(noDataBatchesEnabled).append(", ").append("lastExecutionRequiresAnotherBatch = ").append(lastExecutionRequiresAnotherBatch).append(", ").append("isNewDataAvailable = ").append(this.isNewDataAvailable()).append(", ").append("shouldConstructNextBatch = ").append(shouldConstructNextBatch).toString());
                if (shouldConstructNextBatch) {
                    this.updateStatusMessage("Writing offsets to log");
                    this.reportTimeTaken("walCommit", (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                        Predef$.MODULE$.assert(this.offsetLog().add(this.currentBatchId(), this.availableOffsets().toOffsetSeq(this.sources(), this.offsetSeqMetadata())), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Concurrent update to the log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString());
                        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(39).append("Committed offsets for batch ").append(this.currentBatchId()).append(". ").append("Metadata ").append(this.offsetSeqMetadata().toString()).toString());
                        if (this.currentBatchId() != 0L) {
                            Option<OffsetSeq> prevBatchOff = this.offsetLog().get(this.currentBatchId() - 1L);
                            if (prevBatchOff.isDefined()) {
                                ((OffsetSeq)prevBatchOff.get()).toStreamProgress(this.sources()).foreach((Function1 & Serializable & scala.Serializable)x0$4 -> {
                                    MicroBatchExecution.$anonfun$constructNextBatch$16(x0$4);
                                    return BoxedUnit.UNIT;
                                });
                            } else {
                                throw new IllegalStateException(new StringBuilder(20).append("batch ").append(this.currentBatchId() - 1L).append(" doesn't exist").toString());
                            }
                        }
                        if ((long)this.minLogEntriesToMaintain() < this.currentBatchId()) {
                            this.purge(this.currentBatchId() - (long)this.minLogEntriesToMaintain());
                            return;
                        }
                    });
                    this.noNewData_$eq(false);
                } else {
                    this.noNewData_$eq(true);
                    this.awaitProgressLockCondition().signalAll();
                }
                return shouldConstructNextBatch;
            }));
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                bl = ex.value$mcZ$sp();
            }
            throw ex;
        }
        return bl;
    }

    private void runBatch(SparkSession sparkSessionToRunBatch) {
        LogicalPlan logicalPlan2;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(14).append("Running batch ").append(this.currentBatchId()).toString());
        Map mutableNewData = Map$.MODULE$.empty().$plus$plus((GenTraversableOnce)this.reportTimeTaken("getBatch", (Function0 & Serializable & scala.Serializable)() -> (scala.collection.immutable.Map)this.availableOffsets().flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                SparkDataStream source = (SparkDataStream)tuple2._1();
                org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                if (source instanceof Source) {
                    Source source2 = (Source)source;
                    if (available instanceof Offset) {
                        Offset offset = (Offset)available;
                        if (BoxesRunTime.unboxToBoolean((Object)this.committedOffsets().get(source2).map((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$runBatch$4(offset, x$7))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> true))) {
                            Option current = this.committedOffsets().get(source2).map((Function1 & Serializable & scala.Serializable)x$8 -> (Offset)((Object)((Object)((Object)x$8))));
                            Dataset<Row> batch = source2.getBatch((Option<Offset>)current, offset);
                            Predef$.MODULE$.assert(batch.isStreaming(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("DataFrame returned by getBatch from ").append(source2).append(" did not have isStreaming=true\n").append(batch.queryExecution().logical()).toString());
                            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("Retrieving data from ").append(source2).append(": ").append(current).append(" -> ").append((Object)offset).toString());
                            return Option$.MODULE$.option2Iterable((Option)new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)source2), (Object)batch.logicalPlan())));
                        }
                    }
                }
            }
            if (tuple2 != null) {
                SparkDataStream stream = (SparkDataStream)tuple2._1();
                org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
                if (stream instanceof MicroBatchStream) {
                    MicroBatchStream microBatchStream = (MicroBatchStream)stream;
                    if (BoxesRunTime.unboxToBoolean((Object)this.committedOffsets().get((SparkDataStream)microBatchStream).map((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$runBatch$9(available, x$9))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> true))) {
                        org.apache.spark.sql.connector.read.streaming.Offset offset;
                        Option current = this.committedOffsets().get((SparkDataStream)microBatchStream).map((Function1 & Serializable & scala.Serializable)off -> microBatchStream.deserializeOffset(off.json()));
                        org.apache.spark.sql.connector.read.streaming.Offset offset2 = available;
                        if (offset2 instanceof SerializedOffset) {
                            SerializedOffset serializedOffset = (SerializedOffset)offset2;
                            offset = microBatchStream.deserializeOffset(serializedOffset.json());
                        } else if (offset2 != null) {
                            org.apache.spark.sql.connector.read.streaming.Offset offset3;
                            offset = offset3 = offset2;
                        } else {
                            throw new MatchError((Object)offset2);
                        }
                        org.apache.spark.sql.connector.read.streaming.Offset endOffset = offset;
                        org.apache.spark.sql.connector.read.streaming.Offset startOffset = (org.apache.spark.sql.connector.read.streaming.Offset)current.getOrElse((Function0 & Serializable & scala.Serializable)() -> microBatchStream.initialOffset());
                        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(27).append("Retrieving data from ").append(microBatchStream).append(": ").append(current).append(" -> ").append(endOffset).toString());
                        return Option$.MODULE$.option2Iterable((Option)new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)microBatchStream), (Object)new OffsetHolder(startOffset, endOffset))));
                    }
                }
            }
            return Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
        }, scala.collection.immutable.Map$.MODULE$.canBuildFrom())));
        LogicalPlan newBatchesPlan = (LogicalPlan)this.logicalPlan().transform((PartialFunction)new scala.Serializable(null, mutableNewData){
            public static final long serialVersionUID = 0L;
            private final Map mutableNewData$1;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof StreamingExecutionRelation) {
                    StreamingExecutionRelation streamingExecutionRelation = (StreamingExecutionRelation)A1;
                    SparkDataStream source = streamingExecutionRelation.source();
                    Seq<Attribute> output = streamingExecutionRelation.output();
                    return (B1)this.mutableNewData$1.get((Object)source).map((Function1 & Serializable & scala.Serializable)dataPlan -> {
                        boolean hasFileMetadata = output.exists((Function1 & Serializable & scala.Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)$anonfun$2.$anonfun$applyOrElse$9(x0$1)));
                        LogicalPlan finalDataPlan = (LogicalPlan)dataPlan.transformUp((PartialFunction)new scala.Serializable(null, hasFileMetadata){
                            public static final long serialVersionUID = 0L;
                            private final boolean hasFileMetadata$1;

                            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                                A1 A1 = x1;
                                if (A1 instanceof LogicalRelation) {
                                    LogicalRelation logicalRelation = (LogicalRelation)A1;
                                    if (this.hasFileMetadata$1) {
                                        return (B1)((Object)logicalRelation.withMetadataColumns());
                                    }
                                }
                                return (B1)function1.apply(x1);
                            }

                            public final boolean isDefinedAt(LogicalPlan x1) {
                                LogicalPlan logicalPlan2 = x1;
                                return logicalPlan2 instanceof LogicalRelation && this.hasFileMetadata$1;
                            }
                            {
                                this.hasFileMetadata$1 = hasFileMetadata$1;
                            }
                        });
                        $this.mutableNewData$1.put((Object)source, (Object)finalDataPlan);
                        int maxFields = SQLConf$.MODULE$.get().maxToStringFields();
                        Predef$.MODULE$.assert(output.size() == finalDataPlan.output().size(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("Invalid batch: ").append(org.apache.spark.sql.catalyst.util.package$.MODULE$.truncatedString(output, ",", maxFields)).append(" != ").append(org.apache.spark.sql.catalyst.util.package$.MODULE$.truncatedString(finalDataPlan.output(), ",", maxFields)).toString());
                        Seq aliases = (Seq)((TraversableLike)output.zip((GenIterable)finalDataPlan.output(), Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                            Tuple2 tuple2 = x0$2;
                            if (tuple2 != null) {
                                Attribute from;
                                Attribute to = (Attribute)tuple2._1();
                                Attribute x$1 = from = (Attribute)tuple2._2();
                                String x$2 = to.name();
                                ExprId x$3 = to.exprId();
                                Some x$4 = new Some((Object)from.metadata());
                                Seq x$5 = Alias$.MODULE$.apply$default$4((Expression)x$1, x$2);
                                Seq x$6 = Alias$.MODULE$.apply$default$6((Expression)x$1, x$2);
                                return new Alias((Expression)x$1, x$2, x$3, x$5, (Option)x$4, x$6);
                            }
                            throw new MatchError((Object)tuple2);
                        }, Seq$.MODULE$.canBuildFrom());
                        return new Project(aliases, finalDataPlan);
                    }).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                        Seq x$7 = output;
                        boolean x$8 = true;
                        Seq x$9 = LocalRelation$.MODULE$.apply$default$2();
                        return new LocalRelation(x$7, x$9, x$8);
                    });
                }
                if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    return (B1)this.mutableNewData$1.get((Object)streamingDataSourceV2Relation.stream()).map((Function1 & Serializable & scala.Serializable)x0$3 -> {
                        LogicalPlan logicalPlan2 = x0$3;
                        if (logicalPlan2 instanceof OffsetHolder) {
                            OffsetHolder offsetHolder = (OffsetHolder)logicalPlan2;
                            org.apache.spark.sql.connector.read.streaming.Offset start = offsetHolder.start();
                            org.apache.spark.sql.connector.read.streaming.Offset end = offsetHolder.end();
                            Some x$10 = new Some((Object)start);
                            Some x$11 = new Some((Object)end);
                            Seq x$12 = streamingDataSourceV2Relation.copy$default$1();
                            Scan x$13 = streamingDataSourceV2Relation.copy$default$2();
                            SparkDataStream x$14 = streamingDataSourceV2Relation.copy$default$3();
                            return streamingDataSourceV2Relation.copy(x$12, x$13, x$14, (Option)x$10, (Option)x$11);
                        }
                        throw new MatchError((Object)logicalPlan2);
                    }).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                        Seq x$15 = streamingDataSourceV2Relation.output();
                        boolean x$16 = true;
                        Seq x$17 = LocalRelation$.MODULE$.apply$default$2();
                        return new LocalRelation(x$15, x$17, x$16);
                    });
                }
                return (B1)function1.apply(x1);
            }

            public final boolean isDefinedAt(LogicalPlan x1) {
                LogicalPlan logicalPlan2 = x1;
                if (logicalPlan2 instanceof StreamingExecutionRelation) {
                    return true;
                }
                return logicalPlan2 instanceof StreamingDataSourceV2Relation;
            }

            public static final /* synthetic */ boolean $anonfun$applyOrElse$9(Attribute x0$1) {
                AttributeReference attributeReference;
                Option option;
                Attribute attribute = x0$1;
                return attribute instanceof AttributeReference && !(option = FileSourceMetadataAttribute$.MODULE$.unapply(attributeReference = (AttributeReference)attribute)).isEmpty();
            }
            {
                this.mutableNewData$1 = mutableNewData$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$8(org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2 scala.collection.Seq org.apache.spark.sql.connector.read.streaming.SparkDataStream org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$12(scala.collection.Seq ), $anonfun$applyOrElse$13(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$14(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation ), $anonfun$applyOrElse$9$adapted(org.apache.spark.sql.catalyst.expressions.Attribute ), $anonfun$applyOrElse$10(scala.collection.Seq int org.apache.spark.sql.catalyst.plans.logical.LogicalPlan ), $anonfun$applyOrElse$11(scala.Tuple2 )}, serializedLambda);
            }
        });
        this.newData_$eq((scala.collection.immutable.Map<SparkDataStream, LogicalPlan>)mutableNewData.toMap(Predef$.MODULE$.$conforms()));
        LogicalPlan newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToBoolean((boolean)x$10.containsPattern(TreePattern$.MODULE$.CURRENT_LIKE())), newBatchesPlan.transformAllExpressionsWithPruning$default$2(), (PartialFunction)new scala.Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ MicroBatchExecution $outer;

            public final <A1 extends Expression, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                A1 A1 = x2;
                if (A1 instanceof CurrentTimestamp) {
                    CurrentTimestamp currentTimestamp = (CurrentTimestamp)A1;
                    return (B1)new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), currentTimestamp.dataType(), (Option)new Some((Object)"Dummy TimeZoneId"));
                }
                if (A1 instanceof LocalTimestamp) {
                    LocalTimestamp localTimestamp = (LocalTimestamp)A1;
                    return (B1)new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), localTimestamp.dataType(), localTimestamp.timeZoneId());
                }
                if (A1 instanceof CurrentDate) {
                    CurrentDate currentDate = (CurrentDate)A1;
                    return (B1)new CurrentBatchTimestamp(this.$outer.offsetSeqMetadata().batchTimestampMs(), currentDate.dataType(), currentDate.timeZoneId());
                }
                return (B1)function1.apply(x2);
            }

            public final boolean isDefinedAt(Expression x2) {
                Expression expression = x2;
                if (expression instanceof CurrentTimestamp) {
                    return true;
                }
                if (expression instanceof LocalTimestamp) {
                    return true;
                }
                return expression instanceof CurrentDate;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        Table table = this.sink();
        if (table instanceof Sink) {
            logicalPlan2 = newAttributePlan;
        } else if (table instanceof SupportsWrite) {
            logicalPlan2 = ((WriteToMicroBatchDataSource)newAttributePlan).withNewBatchId(this.currentBatchId());
        } else {
            throw new IllegalArgumentException(new StringBuilder(22).append("unknown sink type for ").append(this.sink()).toString());
        }
        LogicalPlan triggerLogicalPlan = logicalPlan2;
        sparkSessionToRunBatch.sparkContext().setLocalProperty(MicroBatchExecution$.MODULE$.BATCH_ID_KEY(), Long.toString(this.currentBatchId()));
        sparkSessionToRunBatch.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), Boolean.toString(false));
        this.reportTimeTaken("queryPlanning", (Function0 & Serializable & scala.Serializable)() -> {
            this.lastExecution_$eq(new IncrementalExecution(sparkSessionToRunBatch, triggerLogicalPlan, this.outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
            return this.lastExecution().executedPlan();
        });
        Dataset nextBatch = new Dataset(this.lastExecution(), RowEncoder$.MODULE$.apply(this.lastExecution().analyzed().schema()));
        Option batchSinkProgress = (Option)this.reportTimeTaken("addBatch", (Function0 & Serializable & scala.Serializable)() -> (Option)SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0 & Serializable & scala.Serializable)() -> {
            Table table = this.sink();
            if (table instanceof Sink) {
                Sink sink = (Sink)table;
                sink.addBatch(this.currentBatchId(), nextBatch);
            } else if (table instanceof SupportsWrite) {
                nextBatch.collect();
            } else {
                throw new MatchError((Object)table);
            }
            SparkPlan sparkPlan = this.lastExecution().executedPlan();
            if (sparkPlan instanceof WriteToDataSourceV2Exec) {
                WriteToDataSourceV2Exec writeToDataSourceV2Exec = (WriteToDataSourceV2Exec)sparkPlan;
                return writeToDataSourceV2Exec.commitProgress();
            }
            return None$.MODULE$;
        }));
        this.withProgressLocked((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.sinkCommitProgress_$eq((Option<StreamWriterCommitProgress>)batchSinkProgress);
            this.watermarkTracker().updateWatermark(this.lastExecution().executedPlan());
            Predef$.MODULE$.assert(this.commitLog().add(this.currentBatchId(), new CommitMetadata(this.watermarkTracker().currentWatermark())), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(74).append("Concurrent update to the commit log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString());
            this.committedOffsets_$eq(this.committedOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>)this.availableOffsets()));
        });
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(16).append("Completed batch ").append(this.currentBatchId()).toString());
    }

    public <T> T withProgressLocked(Function0<T> f) {
        Object object;
        this.awaitProgressLock().lock();
        try {
            object = f.apply();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
        return (T)object;
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$3(MicroBatchExecution $this, SparkSession sparkSessionToRunBatches$1, OffsetSeqMetadata metadata) {
        OffsetSeqMetadata$.MODULE$.setSessionConf(metadata, sparkSessionToRunBatches$1.conf());
        $this.offsetSeqMetadata_$eq(OffsetSeqMetadata$.MODULE$.apply(metadata.batchWatermarkMs(), metadata.batchTimestampMs(), sparkSessionToRunBatches$1.conf()));
        $this.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSessionToRunBatches$1.conf()));
        $this.watermarkTracker().setWatermark(metadata.batchWatermarkMs());
    }

    public static final /* synthetic */ boolean $anonfun$populateStartOffsets$8(Offset x3$1, Offset x$4) {
        Offset offset = x$4;
        Offset offset2 = x3$1;
        return !(offset != null ? !((Object)((Object)offset)).equals((Object)offset2) : offset2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$2(org.apache.spark.sql.connector.read.streaming.Offset available$1, org.apache.spark.sql.connector.read.streaming.Offset committed) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = committed;
        org.apache.spark.sql.connector.read.streaming.Offset offset2 = available$1;
        return offset == null ? offset2 != null : !offset.equals(offset2);
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$1(MicroBatchExecution $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            SparkDataStream source = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset available = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            return BoxesRunTime.unboxToBoolean((Object)$this.committedOffsets().get(source).map((Function1 & Serializable & scala.Serializable)committed -> BoxesRunTime.boxToBoolean((boolean)MicroBatchExecution.$anonfun$isNewDataAvailable$2(available, committed))).getOrElse((Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> true));
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$7(Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            Option o = (Option)tuple2._2();
            return o.nonEmpty();
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$9(Tuple2 x0$3) {
        Tuple2 tuple2 = x0$3;
        if (tuple2 != null) {
            Option o = (Option)tuple2._2();
            return o.nonEmpty();
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ void $anonfun$constructNextBatch$16(Tuple2 x0$4) {
        Tuple2 tuple2 = x0$4;
        if (tuple2 != null) {
            SparkDataStream src = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset off = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            if (src instanceof Source) {
                Source source = (Source)src;
                if (off instanceof Offset) {
                    Offset offset = (Offset)off;
                    source.commit(offset);
                    return;
                }
            }
        }
        if (tuple2 != null) {
            SparkDataStream stream = (SparkDataStream)tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset off = (org.apache.spark.sql.connector.read.streaming.Offset)tuple2._2();
            if (stream instanceof MicroBatchStream) {
                MicroBatchStream microBatchStream = (MicroBatchStream)stream;
                microBatchStream.commit(microBatchStream.deserializeOffset(off.json()));
                return;
            }
        }
        if (tuple2 != null) {
            SparkDataStream src = (SparkDataStream)tuple2._1();
            throw new IllegalArgumentException(new StringBuilder(47).append("Unknown source is found at constructNextBatch: ").append(src).toString());
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$4(Offset x3$3, org.apache.spark.sql.connector.read.streaming.Offset x$7) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = x$7;
        Offset offset2 = x3$3;
        return offset == null ? offset2 != null : !offset.equals((Object)offset2);
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$9(org.apache.spark.sql.connector.read.streaming.Offset available$2, org.apache.spark.sql.connector.read.streaming.Offset x$9) {
        org.apache.spark.sql.connector.read.streaming.Offset offset = x$9;
        org.apache.spark.sql.connector.read.streaming.Offset offset2 = available$2;
        return offset == null ? offset2 != null : !offset.equals(offset2);
    }

    public MicroBatchExecution(SparkSession sparkSession, Trigger trigger, Clock triggerClock, scala.collection.immutable.Map<String, String> extraOptions, WriteToStream plan) {
        Product product;
        this.extraOptions = extraOptions;
        this.plan = plan;
        super(sparkSession, plan.name(), plan.resolvedCheckpointLocation(), plan.inputQuery(), plan.sink(), trigger, triggerClock, plan.outputMode(), plan.deleteCheckpointOnStop());
        this.sources = (Seq)Nil$.MODULE$;
        Trigger trigger2 = super.trigger();
        if (trigger2 instanceof ProcessingTimeTrigger) {
            ProcessingTimeTrigger processingTimeTrigger = (ProcessingTimeTrigger)trigger2;
            product = new ProcessingTimeExecutor(processingTimeTrigger, super.triggerClock());
        } else if (OneTimeTrigger$.MODULE$.equals(trigger2)) {
            product = new SingleBatchExecutor();
        } else if (AvailableNowTrigger$.MODULE$.equals(trigger2)) {
            product = new MultiBatchExecutor();
        } else {
            throw new IllegalStateException(new StringBuilder(25).append("Unknown type of trigger: ").append(super.trigger()).toString());
        }
        this.triggerExecutor = product;
        this.isCurrentBatchConstructed = false;
    }
}

