/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.spark.SparkEnv$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.CurrentDate;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestampLike;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.LocalTimestamp;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2;
import org.apache.spark.sql.catalyst.streaming.WriteToStream;
import org.apache.spark.sql.catalyst.trees.TreePattern$;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.errors.QueryCompilationErrors$;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation$;
import org.apache.spark.sql.execution.streaming.ACTIVE$;
import org.apache.spark.sql.execution.streaming.AcceptsLatestSeenOffsetHandler$;
import org.apache.spark.sql.execution.streaming.CommitMetadata;
import org.apache.spark.sql.execution.streaming.CommitMetadata$;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeq$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.execution.streaming.RECONFIGURING$;
import org.apache.spark.sql.execution.streaming.State;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution$;
import org.apache.spark.sql.execution.streaming.TERMINATED$;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.IncrementAndGetEpoch$;
import org.apache.spark.sql.execution.streaming.continuous.StopContinuousExecutionWrites$;
import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSource;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\u0005a\u0001\u0002\u0014(\u0001YB\u0011b\u000f\u0001\u0003\u0002\u0003\u0006I\u0001\u0010!\t\u0013\u0005\u0003!\u0011!Q\u0001\n\t;\u0005\"\u0003%\u0001\u0005\u0003\u0005\u000b\u0011B%P\u0011!\u0001\u0006A!A!\u0002\u0013\t\u0006\u0002C1\u0001\u0005\u0003\u0005\u000b\u0011\u00022\t\u000b%\u0004A\u0011\u00016\t\u000fI\u0004\u0001\u0019!C\tg\"I\u0011Q\u0002\u0001A\u0002\u0013E\u0011q\u0002\u0005\b\u0003;\u0001\u0001\u0015)\u0003u\u00111\t9\u0003\u0001a\u0001\u0002\u0004%\t!LA\u0015\u00111\tY\u0003\u0001a\u0001\u0002\u0004%\t!LA\u0017\u0011)\t\t\u0004\u0001a\u0001\u0002\u0003\u0006KA\u0018\u0005\n\u0003g\u0001!\u0019!C\u0005\u0003kA\u0001\"a\u0015\u0001A\u0003%\u0011q\u0007\u0005\n\u0003+\u0002!\u0019!C!\u0003/B\u0001\"a\u0018\u0001A\u0003%\u0011\u0011\f\u0005\b\u0003C\u0002A\u0011BA2\u0011\u001d\tI\b\u0001C\u0005\u0003wB\u0011\"a \u0001\u0005\u0004%I!!!\t\u0011\u0005%\u0005\u0001)A\u0005\u0003\u0007Cq!a#\u0001\t#\ni\tC\u0004\u0002\u0014\u0002!I!!&\t\u000f\u0005u\u0005\u0001\"\u0003\u0002 \"9\u0011Q\u0015\u0001\u0005\u0002\u0005\u001d\u0006bBAb\u0001\u0011\u0005\u0011Q\u0019\u0005\t\u0003\u0013\u0004A\u0011A\u0017\u0002L\"9\u0011q\u001a\u0001\u0005\u0002\u0005E\u0007bBAh\u0001\u0011%\u0011q\u001b\u0005\b\u00033\u0004A\u0011IAl\u00115\tY\u000e\u0001I\u0001\u0004\u0003\u0005I\u0011BAo\u0001\u001e9\u0011q\\\u0014\t\u0002\u0005\u0005hA\u0002\u0014(\u0011\u0003\t\u0019\u000f\u0003\u0004jA\u0011\u0005\u00111\u001e\u0005\n\u0003[\u0004#\u0019!C\u0001\u0003_D\u0001\"a?!A\u0003%\u0011\u0011\u001f\u0005\n\u0003{\u0004#\u0019!C\u0001\u0003_D\u0001\"a@!A\u0003%\u0011\u0011\u001f\u0002\u0014\u0007>tG/\u001b8v_V\u001cX\t_3dkRLwN\u001c\u0006\u0003Q%\n!bY8oi&tWo\\;t\u0015\tQ3&A\u0005tiJ,\u0017-\\5oO*\u0011A&L\u0001\nKb,7-\u001e;j_:T!AL\u0018\u0002\u0007M\fHN\u0003\u00021c\u0005)1\u000f]1sW*\u0011!gM\u0001\u0007CB\f7\r[3\u000b\u0003Q\n1a\u001c:h\u0007\u0001\u0019\"\u0001A\u001c\u0011\u0005aJT\"A\u0015\n\u0005iJ#aD*ue\u0016\fW.\u0012=fGV$\u0018n\u001c8\u0002\u0019M\u0004\u0018M]6TKN\u001c\u0018n\u001c8\u0011\u0005urT\"A\u0017\n\u0005}j#\u0001D*qCJ\\7+Z:tS>t\u0017BA\u001e:\u0003\u001d!(/[4hKJ\u0004\"aQ#\u000e\u0003\u0011S!AK\u0017\n\u0005\u0019#%a\u0002+sS\u001e<WM]\u0005\u0003\u0003f\nA\u0002\u001e:jO\u001e,'o\u00117pG.\u0004\"AS'\u000e\u0003-S!\u0001T\u0018\u0002\tU$\u0018\u000e\\\u0005\u0003\u001d.\u0013Qa\u00117pG.L!\u0001S\u001d\u0002\u0019\u0015DHO]1PaRLwN\\:\u0011\tI[fL\u0018\b\u0003'f\u0003\"\u0001V,\u000e\u0003US!AV\u001b\u0002\rq\u0012xn\u001c;?\u0015\u0005A\u0016!B:dC2\f\u0017B\u0001.X\u0003\u0019\u0001&/\u001a3fM&\u0011A,\u0018\u0002\u0004\u001b\u0006\u0004(B\u0001.X!\t\u0011v,\u0003\u0002a;\n11\u000b\u001e:j]\u001e\fA\u0001\u001d7b]B\u00111mZ\u0007\u0002I*\u0011!&\u001a\u0006\u0003M6\n\u0001bY1uC2L8\u000f^\u0005\u0003Q\u0012\u0014Qb\u0016:ji\u0016$vn\u0015;sK\u0006l\u0017A\u0002\u001fj]&$h\b\u0006\u0004l[:|\u0007/\u001d\t\u0003Y\u0002i\u0011a\n\u0005\u0006w\u0019\u0001\r\u0001\u0010\u0005\u0006\u0003\u001a\u0001\rA\u0011\u0005\u0006\u0011\u001a\u0001\r!\u0013\u0005\u0006!\u001a\u0001\r!\u0015\u0005\u0006C\u001a\u0001\rAY\u0001\bg>,(oY3t+\u0005!\bcA;{{:\u0011a\u000f\u001f\b\u0003)^L\u0011\u0001W\u0005\u0003s^\u000bq\u0001]1dW\u0006<W-\u0003\u0002|y\n\u00191+Z9\u000b\u0005e<\u0006c\u0001@\u0002\n5\tqPC\u0002+\u0003\u0003QA!a\u0001\u0002\u0006\u0005!!/Z1e\u0015\r\t9!L\u0001\nG>tg.Z2u_JL1!a\u0003\u0000\u0005A\u0019uN\u001c;j]V|Wo]*ue\u0016\fW.A\u0006t_V\u00148-Z:`I\u0015\fH\u0003BA\t\u00033\u0001B!a\u0005\u0002\u00165\tq+C\u0002\u0002\u0018]\u0013A!\u00168ji\"A\u00111\u0004\u0005\u0002\u0002\u0003\u0007A/A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u0013\u0005\u0005\u0002\u0003BA\n\u0003GI1!!\nX\u0005!1x\u000e\\1uS2,\u0017!G2veJ,g\u000e^#q_\u000eD7i\\8sI&t\u0017\r^8s\u0013\u0012,\u0012AX\u0001\u001eGV\u0014(/\u001a8u\u000bB|7\r[\"p_J$\u0017N\\1u_JLEm\u0018\u0013fcR!\u0011\u0011CA\u0018\u0011!\tYbCA\u0001\u0002\u0004q\u0016AG2veJ,g\u000e^#q_\u000eD7i\\8sI&t\u0017\r^8s\u0013\u0012\u0004\u0013a\u00024bS2,(/Z\u000b\u0003\u0003o\u0001b!!\u000f\u0002J\u00055SBAA\u001e\u0015\u0011\ti$a\u0010\u0002\r\u0005$x.\\5d\u0015\u0011\t\t%a\u0011\u0002\u0015\r|gnY;se\u0016tGOC\u0002M\u0003\u000bR!!a\u0012\u0002\t)\fg/Y\u0005\u0005\u0003\u0017\nYDA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\r)\u0018qJ\u0005\u0004\u0003#b(!\u0003+ie><\u0018M\u00197f\u0003!1\u0017-\u001b7ve\u0016\u0004\u0013a\u00037pO&\u001c\u0017\r\u001c)mC:,\"!!\u0017\u0011\u00071\fY&C\u0002\u0002^\u001d\u00121d\u0016:ji\u0016$vnQ8oi&tWo\\;t\t\u0006$\u0018mU8ve\u000e,\u0017\u0001\u00047pO&\u001c\u0017\r\u001c)mC:\u0004\u0013a\u00075bg\u0012K7\u000f\u001e:jEV$\u0018n\u001c8SKF,\u0018N]3nK:$8\u000f\u0006\u0003\u0002f\u0005-\u0004\u0003BA\n\u0003OJ1!!\u001bX\u0005\u001d\u0011un\u001c7fC:Dq!!\u001c\u0012\u0001\u0004\ty'A\u0003xe&$X\r\u0005\u0003\u0002r\u0005UTBAA:\u0015\u0011\ti'!\u0002\n\t\u0005]\u00141\u000f\u0002\u0006/JLG/Z\u0001\u0018Q\u0006\u001cxJ\u001d3fe&twMU3rk&\u0014X-\\3oiN$B!!\u001a\u0002~!9\u0011Q\u000e\nA\u0002\u0005=\u0014a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005\r\u0005c\u0001\u001d\u0002\u0006&\u0019\u0011qQ\u0015\u0003-A\u0013xnY3tg&tw\rV5nK\u0016CXmY;u_J\f\u0001\u0003\u001e:jO\u001e,'/\u0012=fGV$xN\u001d\u0011\u0002%I,h.Q2uSZ\fG/\u001a3TiJ,\u0017-\u001c\u000b\u0005\u0003#\ty\t\u0003\u0004\u0002\u0012V\u0001\r\u0001P\u0001\u0016gB\f'o[*fgNLwN\u001c$peN#(/Z1n\u0003=9W\r^*uCJ$xJ\u001a4tKR\u001cHCAAL!\rA\u0014\u0011T\u0005\u0004\u00037K#!C(gMN,GoU3r\u00035\u0011XO\\\"p]RLg.^8vgR!\u0011\u0011CAQ\u0011\u0019\t\u0019k\u0006a\u0001y\u0005!2\u000f]1sWN+7o]5p]\u001a{'/U;fef\f\u0011\"\u00193e\u001f\u001a47/\u001a;\u0015\u0011\u0005E\u0011\u0011VAZ\u0003oCq!a+\u0019\u0001\u0004\ti+A\u0003fa>\u001c\u0007\u000e\u0005\u0003\u0002\u0014\u0005=\u0016bAAY/\n!Aj\u001c8h\u0011\u0019\t)\f\u0007a\u0001{\u000611\u000f\u001e:fC6Dq!!/\u0019\u0001\u0004\tY,\u0001\tqCJ$\u0018\u000e^5p]>3gm]3ugB!QO_A_!\rq\u0018qX\u0005\u0004\u0003\u0003|(a\u0004)beRLG/[8o\u001f\u001a47/\u001a;\u0002\r\r|W.\\5u)\u0011\t\t\"a2\t\u000f\u0005-\u0016\u00041\u0001\u0002.\u0006Q\u0011m^1ji\u0016\u0003xn\u00195\u0015\t\u0005E\u0011Q\u001a\u0005\b\u0003WS\u0002\u0019AAW\u0003=\u0019Ho\u001c9J]:+w\u000f\u00165sK\u0006$G\u0003BA\t\u0003'Dq!!6\u001c\u0001\u0004\ti%A\u0003feJ|'\u000f\u0006\u0002\u0002\u0012\u0005!1\u000f^8q\u0003I\u0019X\u000f]3sIM\u0004\u0018M]6TKN\u001c\u0018n\u001c8\u0016\u0003q\n1cQ8oi&tWo\\;t\u000bb,7-\u001e;j_:\u0004\"\u0001\u001c\u0011\u0014\u0007\u0001\n)\u000f\u0005\u0003\u0002\u0014\u0005\u001d\u0018bAAu/\n1\u0011I\\=SK\u001a$\"!!9\u0002\u001fM#\u0016I\u0015+`\u000bB{5\tS0L\u000bf+\"!!=\u0011\t\u0005M\u0018\u0011`\u0007\u0003\u0003kTA!a>\u0002F\u0005!A.\u00198h\u0013\r\u0001\u0017Q_\u0001\u0011'R\u000b%\u000bV0F!>\u001b\u0005jX&F3\u0002\n\u0001$\u0012)P\u0007\"{6iT(S\t&s\u0015\tV(S?&#ulS#Z\u0003e)\u0005kT\"I?\u000e{uJ\u0015#J\u001d\u0006#vJU0J\t~[U)\u0017\u0011")
public class ContinuousExecution
extends StreamExecution {
    private volatile Seq<ContinuousStream> sources = (Seq)Nil$.MODULE$;
    private String currentEpochCoordinatorId;
    private final AtomicReference<Throwable> failure = new AtomicReference<Object>(null);
    private final WriteToContinuousDataSource logicalPlan;
    private final ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;

    public static String EPOCH_COORDINATOR_ID_KEY() {
        return ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY();
    }

    public static String START_EPOCH_KEY() {
        return ContinuousExecution$.MODULE$.START_EPOCH_KEY();
    }

    private /* synthetic */ SparkSession super$sparkSession() {
        return super.sparkSession();
    }

    public Seq<ContinuousStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<ContinuousStream> x$1) {
        this.sources = x$1;
    }

    public String currentEpochCoordinatorId() {
        return this.currentEpochCoordinatorId;
    }

    public void currentEpochCoordinatorId_$eq(String x$1) {
        this.currentEpochCoordinatorId = x$1;
    }

    private AtomicReference<Throwable> failure() {
        return this.failure;
    }

    @Override
    public WriteToContinuousDataSource logicalPlan() {
        return this.logicalPlan;
    }

    private boolean hasDistributionRequirements(Write write) {
        RequiresDistributionAndOrdering requiresDistributionAndOrdering;
        Write write2 = write;
        if (write2 instanceof RequiresDistributionAndOrdering && (requiresDistributionAndOrdering = (RequiresDistributionAndOrdering)write2).requiredNumPartitions() == 0) {
            Distribution distribution = requiresDistributionAndOrdering.requiredDistribution();
            return !(distribution instanceof UnspecifiedDistribution);
        }
        return false;
    }

    private boolean hasOrderingRequirements(Write write) {
        RequiresDistributionAndOrdering requiresDistributionAndOrdering;
        Write write2 = write;
        return write2 instanceof RequiresDistributionAndOrdering && new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])(requiresDistributionAndOrdering = (RequiresDistributionAndOrdering)write2).requiredOrdering())).nonEmpty();
    }

    public ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor() {
        return this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;
    }

    @Override
    public void runActivatedStream(SparkSession sparkSessionForStream) {
        UnaryOperator<State> stateUpdate = new UnaryOperator<State>(null){

            public State apply(State s) {
                State state = s;
                if (RECONFIGURING$.MODULE$.equals(state)) {
                    return ACTIVE$.MODULE$;
                }
                return s;
            }
        };
        while (true) {
            this.runContinuous(sparkSessionForStream);
            State state = this.state().updateAndGet(stateUpdate);
            ACTIVE$ aCTIVE$ = ACTIVE$.MODULE$;
            if (state == null) {
                if (aCTIVE$ == null) continue;
                break;
            }
            if (!state.equals(aCTIVE$)) break;
        }
        this.stopSources();
    }

    private OffsetSeq getStartOffsets() {
        Some some;
        Tuple2 tuple2;
        Option option = this.commitLog().getLatest();
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpochId = tuple2._1$mcJ$sp();
            this.updateStatusMessage(new StringBuilder(67).append("Starting new streaming query ").append("and getting offsets from latest epoch ").append(latestEpochId).toString());
            OffsetSeq nextOffsets = (OffsetSeq)this.offsetLog().get(latestEpochId).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(47).append("Batch ").append(latestEpochId).append(" was committed without end epoch offsets!").toString());
            });
            this.committedOffsets_$eq(nextOffsets.toStreamProgress(this.sources()));
            this.currentBatchId_$eq(latestEpochId + 1L);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Resuming at epoch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).toString());
            return nextOffsets;
        }
        if (None$.MODULE$.equals(option)) {
            this.updateStatusMessage("Starting new streaming query");
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting new streaming query.");
            this.currentBatchId_$eq(0L);
            return OffsetSeq$.MODULE$.fill((Seq<Offset>)((Seq)this.sources().map((Function1 & Serializable & scala.Serializable)x$1 -> null, Seq$.MODULE$.canBuildFrom())));
        }
        throw new MatchError(option);
    }

    private void runContinuous(SparkSession sparkSessionForQuery) {
        block9: {
            OffsetSeq offsets = this.getStartOffsets();
            if (this.currentBatchId() > 0L) {
                AcceptsLatestSeenOffsetHandler$.MODULE$.setLatestSeenOffsetOnSources((Option<OffsetSeq>)new Some((Object)offsets), this.sources());
            }
            LogicalPlan withNewSources = (LogicalPlan)this.logicalPlan().transform((PartialFunction)new scala.Serializable(null, offsets){
                public static final long serialVersionUID = 0L;
                private final OffsetSeq offsets$1;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                    A1 A1 = x1;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        Option loggedOffset = (Option)this.offsets$1.offsets().apply(0);
                        Option realOffset = loggedOffset.map((Function1 & Serializable & scala.Serializable)off -> streamingDataSourceV2Relation.stream().deserializeOffset(off.json()));
                        Offset startOffset = (Offset)realOffset.getOrElse((Function0 & Serializable & scala.Serializable)() -> streamingDataSourceV2Relation.stream().initialOffset());
                        Some x$1 = new Some((Object)startOffset);
                        Seq x$2 = streamingDataSourceV2Relation.copy$default$1();
                        Scan x$3 = streamingDataSourceV2Relation.copy$default$2();
                        SparkDataStream x$4 = streamingDataSourceV2Relation.copy$default$3();
                        Option x$5 = streamingDataSourceV2Relation.copy$default$5();
                        return (B1)streamingDataSourceV2Relation.copy(x$2, x$3, x$4, (Option)x$1, x$5);
                    }
                    return (B1)function1.apply(x1);
                }

                public final boolean isDefinedAt(LogicalPlan x1) {
                    LogicalPlan logicalPlan2 = x1;
                    return logicalPlan2 instanceof StreamingDataSourceV2Relation;
                }
                {
                    this.offsets$1 = offsets$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$3(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation org.apache.spark.sql.connector.read.streaming.Offset ), $anonfun$applyOrElse$4(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation )}, serializedLambda);
                }
            });
            withNewSources.transformAllExpressionsWithPruning((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.containsPattern(TreePattern$.MODULE$.CURRENT_LIKE())), withNewSources.transformAllExpressionsWithPruning$default$2(), (PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends Expression, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                    A1 A1 = x2;
                    if (A1 instanceof CurrentTimestampLike ? true : (A1 instanceof CurrentDate ? true : A1 instanceof LocalTimestamp)) {
                        throw new IllegalStateException("CurrentTimestamp, Now, CurrentDate and LocalTimestamp not yet supported for continuous processing");
                    }
                    return (B1)function1.apply(x2);
                }

                public final boolean isDefinedAt(Expression x2) {
                    Expression expression = x2;
                    return expression instanceof CurrentTimestampLike ? true : (expression instanceof CurrentDate ? true : expression instanceof LocalTimestamp);
                }
            });
            this.reportTimeTaken("queryPlanning", (Function0 & Serializable & scala.Serializable)() -> {
                this.lastExecution_$eq(new IncrementalExecution(sparkSessionForQuery, withNewSources, this.outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
                return this.lastExecution().executedPlan();
            });
            ContinuousStream stream = (ContinuousStream)withNewSources.collect((PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x3, Function1<A1, B1> function1) {
                    A1 A1 = x3;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        return (B1)((ContinuousStream)streamingDataSourceV2Relation.stream());
                    }
                    return (B1)function1.apply(x3);
                }

                public final boolean isDefinedAt(LogicalPlan x3) {
                    LogicalPlan logicalPlan2 = x3;
                    return logicalPlan2 instanceof StreamingDataSourceV2Relation;
                }
            }).head();
            sparkSessionForQuery.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), Boolean.toString(true));
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY(), Long.toString(this.currentBatchId()));
            String epochCoordinatorId = new StringBuilder(2).append(this.runId()).append("--").append(UUID.randomUUID()).toString();
            this.currentEpochCoordinatorId_$eq(epochCoordinatorId);
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY(), epochCoordinatorId);
            RpcEndpointRef epochEndpoint = EpochCoordinatorRef$.MODULE$.create(this.logicalPlan().write(), stream, this, epochCoordinatorId, this.currentBatchId(), super.sparkSession(), SparkEnv$.MODULE$.get());
            Thread epochUpdateThread = new Thread(new Runnable(this, stream, epochEndpoint){
                private final /* synthetic */ ContinuousExecution $outer;
                private final ContinuousStream stream$1;
                private final RpcEndpointRef epochEndpoint$1;

                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor().execute((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                            $this.$outer.startTrigger();
                            if ($this.stream$1.needsReconfiguration() && $this.$outer.state().compareAndSet(ACTIVE$.MODULE$, RECONFIGURING$.MODULE$)) {
                                if ($this.$outer.queryExecutionThread().isAlive()) {
                                    $this.$outer.queryExecutionThread().interrupt();
                                }
                                return false;
                            }
                            if ($this.$outer.isActive()) {
                                $this.$outer.currentBatchId_$eq(BoxesRunTime.unboxToLong((Object)$this.epochEndpoint$1.askSync((Object)IncrementAndGetEpoch$.MODULE$, ClassTag$.MODULE$.Long())));
                                $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("New epoch ").append($this.$outer.currentBatchId()).append(" is starting.").toString());
                                return true;
                            }
                            return false;
                        });
                    }
                    catch (InterruptedException interruptedException) {
                        return;
                    }
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.stream$1 = stream$1;
                    this.epochEndpoint$1 = epochEndpoint$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 ), $anonfun$run$2(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 )}, serializedLambda);
                }
            }, new StringBuilder(24).append("epoch update thread for ").append(this.prettyIdString()).toString());
            try {
                try {
                    epochUpdateThread.setDaemon(true);
                    epochUpdateThread.start();
                    this.updateStatusMessage("Running");
                    this.reportTimeTaken("runContinuous", (Function0 & Serializable & scala.Serializable)() -> (RDD)SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0 & Serializable & scala.Serializable)() -> this.lastExecution().executedPlan().execute()));
                    Throwable f = this.failure().get();
                    if (f != null) {
                        throw f;
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2;
                    Throwable throwable3 = throwable;
                    if (throwable3 != null && StreamExecution$.MODULE$.isInterruptionException(throwable2 = throwable3, super.sparkSession().sparkContext())) {
                        State state = this.state().get();
                        RECONFIGURING$ rECONFIGURING$ = RECONFIGURING$.MODULE$;
                        if (!(state != null ? !state.equals(rECONFIGURING$) : rECONFIGURING$ != null)) {
                            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Query ").append(this.id()).append(" ignoring exception from reconfiguring: ").append(throwable2).toString());
                            break block9;
                        }
                    }
                    throw throwable;
                }
            }
            finally {
                this.queryExecutionThread().runUninterruptibly((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    try {
                        epochEndpoint.askSync((Object)StopContinuousExecutionWrites$.MODULE$, ClassTag$.MODULE$.Unit());
                    }
                    finally {
                        SparkEnv$.MODULE$.get().rpcEnv().stop(epochEndpoint);
                        epochUpdateThread.interrupt();
                        epochUpdateThread.join();
                        this.super$sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
                    }
                });
                Thread.interrupted();
            }
        }
    }

    public void addOffset(long epoch, ContinuousStream stream, Seq<PartitionOffset> partitionOffsets) {
        Option<OffsetSeq> option;
        Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
        Offset globalOffset = stream.mergeOffsets((PartitionOffset[])partitionOffsets.toArray(ClassTag$.MODULE$.apply(PartitionOffset.class)));
        ContinuousExecution continuousExecution = this;
        synchronized (continuousExecution) {
            this.offsetLog().add(epoch, OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})));
            option = this.offsetLog().get(epoch - 1L);
        }
        Option<OffsetSeq> oldOffset = option;
        if (oldOffset.contains((Object)OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})))) {
            this.noNewData_$eq(true);
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void commit(long epoch) {
        block8: {
            this.updateStatusMessage(new StringBuilder(17).append("Committing epoch ").append(epoch).toString());
            Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
            Predef$.MODULE$.assert(this.offsetLog().get(epoch).isDefined(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("offset for epoch ").append(epoch).append(" not reported before commit").toString());
            ContinuousExecution continuousExecution = this;
            synchronized (continuousExecution) {
                block7: {
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets(), this.latestOffsets());
                    if (!this.queryExecutionThread().isAlive()) break block7;
                    this.commitLog().add(epoch, new CommitMetadata(CommitMetadata$.MODULE$.apply$default$1()));
                    Offset offset = ((SparkDataStream)this.sources().apply(0)).deserializeOffset(((Offset)((Option)((OffsetSeq)this.offsetLog().get(epoch).get()).offsets().apply(0)).get()).json());
                    this.committedOffsets_$eq(this.committedOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, Offset>>)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.sources().apply(0)), (Object)offset), (List)Nil$.MODULE$)));
                    ((SparkDataStream)this.sources().apply(0)).commit(offset);
                    break block8;
                }
                return;
            }
        }
        if ((long)this.minLogEntriesToMaintain() <= epoch) {
            this.purge(epoch + 1L - (long)this.minLogEntriesToMaintain());
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void awaitEpoch(long epoch) {
        while (this.notDone$1(epoch)) {
            this.awaitProgressLock().lock();
            try {
                this.awaitProgressLockCondition().await(100L, TimeUnit.MILLISECONDS);
                if (this.streamDeathCause() == null) continue;
                throw this.streamDeathCause();
            }
            finally {
                this.awaitProgressLock().unlock();
            }
        }
    }

    public void stopInNewThread(Throwable error) {
        if (this.failure().compareAndSet(null, error)) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("Query ").append(this.prettyIdString()).append(" received exception ").append(error).toString());
            this.stopInNewThread();
            return;
        }
    }

    private void stopInNewThread() {
        new Thread(this){
            private final /* synthetic */ ContinuousExecution $outer;

            public void run() {
                try {
                    this.$outer.stop();
                }
                catch (Throwable e) {
                    this.$outer.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> e.getMessage(), e);
                    throw e;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super("stop-continuous-execution");
                this.setDaemon(true);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$3(java.lang.Throwable )}, serializedLambda);
            }
        }.start();
    }

    @Override
    public void stop() {
        this.state().set(TERMINATED$.MODULE$);
        if (this.queryExecutionThread().isAlive()) {
            this.interruptAndAwaitExecutionThreadTermination();
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString());
    }

    private final boolean notDone$1(long epoch$2) {
        Some some;
        Tuple2 tuple2;
        Option latestCommit = this.commitLog().getLatest();
        Option option = latestCommit;
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpoch = tuple2._1$mcJ$sp();
            return latestEpoch < epoch$2;
        }
        if (None$.MODULE$.equals(option)) {
            return true;
        }
        throw new MatchError(option);
    }

    public ContinuousExecution(SparkSession sparkSession, Trigger trigger, Clock triggerClock, scala.collection.immutable.Map<String, String> extraOptions, WriteToStream plan) {
        super(sparkSession, plan.name(), plan.resolvedCheckpointLocation(), plan.inputQuery(), plan.sink(), trigger, triggerClock, plan.outputMode(), plan.deleteCheckpointOnStop());
        Map v2ToRelationMap = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        IntRef nextSourceId = IntRef.create((int)0);
        LogicalPlan _logicalPlan = (LogicalPlan)this.analyzedPlan().transform((PartialFunction)new scala.Serializable(this, v2ToRelationMap, nextSourceId){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ContinuousExecution $outer;
            private final Map v2ToRelationMap$1;
            private final IntRef nextSourceId$1;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                if (A1 instanceof StreamingRelationV2) {
                    StreamingRelationV2 streamingRelationV2 = (StreamingRelationV2)A1;
                    Option ds = streamingRelationV2.source();
                    String sourceName = streamingRelationV2.sourceName();
                    Table table = streamingRelationV2.table();
                    CaseInsensitiveStringMap options = streamingRelationV2.extraOptions();
                    Seq output = streamingRelationV2.output();
                    if (table instanceof SupportsRead) {
                        String dsStr;
                        SupportsRead supportsRead = (SupportsRead)table;
                        String string = dsStr = ds.nonEmpty() ? new StringBuilder(2).append("[").append(ds.get()).append("]").toString() : "";
                        if (!DataSourceV2Implicits$.MODULE$.TableHelper((Table)supportsRead).supports(TableCapability.CONTINUOUS_READ)) {
                            throw QueryExecutionErrors$.MODULE$.continuousProcessingUnsupportedByDataSourceError(sourceName);
                        }
                        return (B1)this.v2ToRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable & scala.Serializable)() -> {
                            String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                            ++$this.nextSourceId$1.elem;
                            $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Reading table [").append(supportsRead).append("] from DataSourceV2 named '").append(sourceName).append("' ").append(dsStr).toString());
                            Scan scan = supportsRead.newScanBuilder(options).build();
                            ContinuousStream stream = scan.toContinuousStream(metadataPath);
                            return new StreamingDataSourceV2Relation(output, scan, (SparkDataStream)stream, StreamingDataSourceV2Relation$.MODULE$.apply$default$4(), StreamingDataSourceV2Relation$.MODULE$.apply$default$5());
                        });
                    }
                }
                return (B1)function1.apply(x1);
            }

            public final boolean isDefinedAt(LogicalPlan x1) {
                StreamingRelationV2 streamingRelationV2;
                Table table;
                LogicalPlan logicalPlan2 = x1;
                return logicalPlan2 instanceof StreamingRelationV2 && (table = (streamingRelationV2 = (StreamingRelationV2)logicalPlan2).table()) instanceof SupportsRead;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.v2ToRelationMap$1 = v2ToRelationMap$1;
                this.nextSourceId$1 = nextSourceId$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$1 org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String org.apache.spark.sql.util.CaseInsensitiveStringMap scala.collection.Seq ), $anonfun$applyOrElse$2(org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String )}, serializedLambda);
            }
        });
        this.sources_$eq((Seq<ContinuousStream>)_logicalPlan.collect((PartialFunction)new scala.Serializable(null){
            public static final long serialVersionUID = 0L;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                A1 A1 = x2;
                if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    return (B1)((ContinuousStream)streamingDataSourceV2Relation.stream());
                }
                return (B1)function1.apply(x2);
            }

            public final boolean isDefinedAt(LogicalPlan x2) {
                LogicalPlan logicalPlan2 = x2;
                return logicalPlan2 instanceof StreamingDataSourceV2Relation;
            }
        }));
        this.uniqueSources_$eq((scala.collection.immutable.Map<SparkDataStream, ReadLimit>)((TraversableOnce)((TraversableLike)this.sources().distinct()).map((Function1 & Serializable & scala.Serializable)s -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(s), (Object)ReadLimit.allAvailable()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        Write write = this.createWrite((SupportsWrite)plan.sink(), extraOptions, _logicalPlan);
        if (this.hasDistributionRequirements(write) || this.hasOrderingRequirements(write)) {
            throw QueryCompilationErrors$.MODULE$.writeDistributionAndOrderingNotSupportedInContinuousExecution();
        }
        StreamingWrite streamingWrite = write.toStreaming();
        Seq customMetrics = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])write.supportedCustomMetrics())).toSeq();
        this.logicalPlan = new WriteToContinuousDataSource(streamingWrite, _logicalPlan, (Seq<CustomMetric>)customMetrics);
        Trigger trigger2 = super.trigger();
        if (!(trigger2 instanceof ContinuousTrigger)) {
            throw new IllegalStateException(new StringBuilder(29).append("Unsupported type of trigger: ").append(super.trigger()).toString());
        }
        ContinuousTrigger continuousTrigger = (ContinuousTrigger)trigger2;
        long t = continuousTrigger.intervalMs();
        this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor = new ProcessingTimeExecutor(new ProcessingTimeTrigger(t), super.triggerClock());
    }
}

