/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.connect.client;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Stream;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.spark.connect.proto.AddArtifactsRequest;
import org.apache.spark.connect.proto.AddArtifactsResponse;
import org.apache.spark.connect.proto.ArtifactStatusesRequest;
import org.apache.spark.connect.proto.ArtifactStatusesResponse;
import org.apache.spark.sql.connect.client.Artifact;
import org.apache.spark.sql.connect.client.Artifact$;
import org.apache.spark.sql.connect.client.ClassFinder;
import org.apache.spark.sql.connect.client.CustomSparkConnectBlockingStub;
import org.apache.spark.sql.connect.client.CustomSparkConnectStub;
import org.apache.spark.sql.connect.client.SparkConnectClient;
import org.apache.spark.util.SparkFileUtils$;
import org.apache.spark.util.SparkThreadUtils$;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Builder;
import scala.concurrent.Awaitable;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005mf\u0001\u0002\f\u0018\u0001\u0011B\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001\f\u0005\ti\u0001\u0011\t\u0011)A\u0005k!A\u0001\t\u0001B\u0001B\u0003%\u0011\t\u0003\u0005E\u0001\t\u0005\t\u0015!\u0003F\u0011\u0015A\u0005\u0001\"\u0001J\u0011\u001dy\u0005A1A\u0005\nACa\u0001\u0016\u0001!\u0002\u0013\t\u0006BB+\u0001A\u0003%a\u000bC\u0003d\u0001\u0011\u0005A\rC\u0003k\u0001\u0011\u00051\u000eC\u0003o\u0001\u0011%q\u000e\u0003\u0004k\u0001\u0011\u0005\u0011\u0011\u0002\u0005\b\u0003\u001b\u0001A\u0011AA\b\u0011!\t9\u0002\u0001C\u0001/\u0005e\u0001bBA\u0013\u0001\u0011\u0005\u0011q\u0005\u0005\t\u0003s\u0001A\u0011A\f\u0002<!A\u0011Q\b\u0001\u0005\u0002m\ty\u0004\u0003\u0005\u0002\u000e\u0001!\taFA+\u0011\u001d\t\t\u0007\u0001C\u0005\u0003GBq!a#\u0001\t\u0013\ti\tC\u0004\u00022\u0002!I!a-\u0003\u001f\u0005\u0013H/\u001b4bGRl\u0015M\\1hKJT!\u0001G\r\u0002\r\rd\u0017.\u001a8u\u0015\tQ2$A\u0004d_:tWm\u0019;\u000b\u0005qi\u0012aA:rY*\u0011adH\u0001\u0006gB\f'o\u001b\u0006\u0003A\u0005\na!\u00199bG\",'\"\u0001\u0012\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001)\u0003C\u0001\u0014*\u001b\u00059#\"\u0001\u0015\u0002\u000bM\u001c\u0017\r\\1\n\u0005):#AB!osJ+g-\u0001\u0007dY&,g\u000e^\"p]\u001aLw\r\u0005\u0002.c9\u0011afL\u0007\u0002/%\u0011\u0001gF\u0001\u0013'B\f'o[\"p]:,7\r^\"mS\u0016tG/\u0003\u00023g\ti1i\u001c8gS\u001e,(/\u0019;j_:T!\u0001M\f\u0002\u0013M,7o]5p]&#\u0007C\u0001\u001c>\u001d\t94\b\u0005\u00029O5\t\u0011H\u0003\u0002;G\u00051AH]8pizJ!\u0001P\u0014\u0002\rA\u0013X\rZ3g\u0013\tqtH\u0001\u0004TiJLgn\u001a\u0006\u0003y\u001d\nQAY:uk\n\u0004\"A\f\"\n\u0005\r;\"AH\"vgR|Wn\u00159be.\u001cuN\u001c8fGR\u0014En\\2lS:<7\u000b^;c\u0003\u0011\u0019H/\u001e2\u0011\u000592\u0015BA$\u0018\u0005Y\u0019Uo\u001d;p[N\u0003\u0018M]6D_:tWm\u0019;TiV\u0014\u0017A\u0002\u001fj]&$h\bF\u0003K\u00172ke\n\u0005\u0002/\u0001!)1&\u0002a\u0001Y!)A'\u0002a\u0001k!)\u0001)\u0002a\u0001\u0003\")A)\u0002a\u0001\u000b\u0006Q1\tS+O\u0017~\u001b\u0016JW#\u0016\u0003E\u0003\"A\n*\n\u0005M;#aA%oi\u0006Y1\tS+O\u0017~\u001b\u0016JW#!\u00031\u0019G.Y:t\r&tG-\u001a:t!\r9f\fY\u0007\u00021*\u0011\u0011LW\u0001\u000bG>t7-\u001e:sK:$(BA.]\u0003\u0011)H/\u001b7\u000b\u0003u\u000bAA[1wC&\u0011q\f\u0017\u0002\u0015\u0007>\u0004\u0018p\u00148Xe&$X-\u0011:sCfd\u0015n\u001d;\u0011\u00059\n\u0017B\u00012\u0018\u0005-\u0019E.Y:t\r&tG-\u001a:\u0002'I,w-[:uKJ\u001cE.Y:t\r&tG-\u001a:\u0015\u0005\u0015D\u0007C\u0001\u0014g\u0013\t9wE\u0001\u0003V]&$\b\"B5\n\u0001\u0004\u0001\u0017A\u00024j]\u0012,'/A\u0006bI\u0012\f%\u000f^5gC\u000e$HCA3m\u0011\u0015i'\u00021\u00016\u0003\u0011\u0001\u0018\r\u001e5\u0002\u001dA\f'o]3BeRLg-Y2ugR\u0011\u0001\u000f \t\u0004cZLhB\u0001:u\u001d\tA4/C\u0001)\u0013\t)x%A\u0004qC\u000e\\\u0017mZ3\n\u0005]D(aA*fc*\u0011Qo\n\t\u0003]iL!a_\f\u0003\u0011\u0005\u0013H/\u001b4bGRDQ!`\u0006A\u0002y\f1!\u001e:j!\ry\u0018QA\u0007\u0003\u0003\u0003Q1!a\u0001]\u0003\rqW\r^\u0005\u0005\u0003\u000f\t\tAA\u0002V%&#2!ZA\u0006\u0011\u0015iH\u00021\u0001\u007f\u00031\tG\rZ!si&4\u0017m\u0019;t)\r)\u0017\u0011\u0003\u0005\b\u0003'i\u0001\u0019AA\u000b\u0003\u0011)(/[:\u0011\u0007E4h0\u0001\tjg\u000e\u000b7\r[3e\u0003J$\u0018NZ1diR!\u00111DA\u0011!\r1\u0013QD\u0005\u0004\u0003?9#a\u0002\"p_2,\u0017M\u001c\u0005\u0007\u0003Gq\u0001\u0019A\u001b\u0002\t!\f7\u000f[\u0001\u000eG\u0006\u001c\u0007.Z!si&4\u0017m\u0019;\u0015\u0007U\nI\u0003C\u0004\u0002,=\u0001\r!!\f\u0002\t\tdwN\u0019\t\u0006M\u0005=\u00121G\u0005\u0004\u0003c9#!B!se\u0006L\bc\u0001\u0014\u00026%\u0019\u0011qG\u0014\u0003\t\tKH/Z\u0001\u001ckBdw.\u00193BY2\u001cE.Y:t\r&dW-\u0011:uS\u001a\f7\r^:\u0015\u0003\u0015\f1\"\u00193e\u00072\f7o\u001d#jeR\u0019Q-!\u0011\t\u000f\u0005\r\u0013\u00031\u0001\u0002F\u0005!!-Y:f!\u0011\t9%!\u0015\u000e\u0005\u0005%#\u0002BA&\u0003\u001b\nAAZ5mK*\u0019\u0011q\n/\u0002\u00079Lw.\u0003\u0003\u0002T\u0005%#\u0001\u0002)bi\"$2!ZA,\u0011\u001d\tIF\u0005a\u0001\u00037\n\u0011\"\u0019:uS\u001a\f7\r^:\u0011\tE\fi&_\u0005\u0004\u0003?B(\u0001C%uKJ\f'\r\\3\u0002'\u0005$GMQ1uG\",G-\u0011:uS\u001a\f7\r^:\u0015\u000b\u0015\f)'a\u001a\t\r\u0005e3\u00031\u0001q\u0011\u001d\tIg\u0005a\u0001\u0003W\naa\u001d;sK\u0006l\u0007CBA7\u0003s\ni(\u0004\u0002\u0002p)\u0019A)!\u001d\u000b\t\u0005M\u0014QO\u0001\u0005OJ\u00048M\u0003\u0002\u0002x\u0005\u0011\u0011n\\\u0005\u0005\u0003w\nyG\u0001\bTiJ,\u0017-\\(cg\u0016\u0014h/\u001a:\u0011\t\u0005}\u0014qQ\u0007\u0003\u0003\u0003SA!a!\u0002\u0006\u0006)\u0001O]8u_*\u0011!$H\u0005\u0005\u0003\u0013\u000b\tIA\nBI\u0012\f%\u000f^5gC\u000e$8OU3rk\u0016\u001cH/A\u0007sK\u0006$g*\u001a=u\u0007\",hn\u001b\u000b\u0005\u0003\u001f\u000b\u0019\u000b\u0005\u0003\u0002\u0012\u0006}UBAAJ\u0015\u0011\t)*a&\u0002\u0011A\u0014x\u000e^8ck\u001aTA!!'\u0002\u001c\u00061qm\\8hY\u0016T!!!(\u0002\u0007\r|W.\u0003\u0003\u0002\"\u0006M%A\u0003\"zi\u0016\u001cFO]5oO\"9\u0011Q\u0015\u000bA\u0002\u0005\u001d\u0016AA5o!\u0011\tI+!,\u000e\u0005\u0005-&bAA<9&!\u0011qVAV\u0005-Ie\u000e];u'R\u0014X-Y7\u0002%\u0005$Gm\u00115v].,G-\u0011:uS\u001a\f7\r\u001e\u000b\u0006K\u0006U\u0016\u0011\u0018\u0005\u0007\u0003o+\u0002\u0019A=\u0002\u0011\u0005\u0014H/\u001b4bGRDq!!\u001b\u0016\u0001\u0004\tY\u0007")
public class ArtifactManager {
    private final SparkConnectClient.Configuration clientConfig;
    private final String sessionId;
    private final CustomSparkConnectBlockingStub bstub;
    private final CustomSparkConnectStub stub;
    private final int CHUNK_SIZE;
    private final CopyOnWriteArrayList<ClassFinder> classFinders;

    private int CHUNK_SIZE() {
        return this.CHUNK_SIZE;
    }

    public void registerClassFinder(ClassFinder finder) {
        this.classFinders.add(finder);
    }

    public void addArtifact(String path) {
        this.addArtifact(SparkFileUtils$.MODULE$.resolveURI(path));
    }

    private Seq<Artifact> parseArtifacts(URI uri) {
        String string = uri.getScheme();
        if ("file".equals(string)) {
            Artifact artifact;
            Path path = Paths.get(uri);
            String string2 = ((Object)path.getFileName()).toString();
            if (string2.endsWith(".jar")) {
                artifact = Artifact$.MODULE$.newJarArtifact(path.getFileName(), new Artifact.LocalFile(path));
            } else if (string2.endsWith(".class")) {
                artifact = Artifact$.MODULE$.newClassArtifact(path.getFileName(), new Artifact.LocalFile(path));
            } else {
                throw new UnsupportedOperationException(new StringBuilder(24).append("Unsuppoted file format: ").append(string2).toString());
            }
            Artifact artifact2 = artifact;
            return (Seq)new .colon.colon((Object)artifact2, (List)Nil$.MODULE$);
        }
        throw new UnsupportedOperationException(new StringBuilder(20).append("Unsupported scheme: ").append(string).toString());
    }

    public void addArtifact(URI uri) {
        this.addArtifacts((Iterable<Artifact>)this.parseArtifacts(uri));
    }

    public void addArtifacts(Seq<URI> uris) {
        this.addArtifacts((Iterable<Artifact>)((Iterable)uris.flatMap((Function1 & Serializable & scala.Serializable)uri -> this.parseArtifacts((URI)uri), Seq$.MODULE$.canBuildFrom())));
    }

    public boolean isCachedArtifact(String hash) {
        String artifactName = new StringBuilder(0).append(Predef.any2stringadd$.MODULE$.$plus$extension(Predef$.MODULE$.any2stringadd((Object)Artifact$.MODULE$.CACHE_PREFIX()), "/")).append(hash).toString();
        ArtifactStatusesRequest request = ArtifactStatusesRequest.newBuilder().setUserContext(this.clientConfig.userContext()).setClientType(this.clientConfig.userAgent()).setSessionId(this.sessionId).addAllNames(Arrays.asList((Object[])new String[]{artifactName})).build();
        Map<String, ArtifactStatusesResponse.ArtifactStatus> statuses = this.bstub.artifactStatus(request).getStatusesMap();
        if (statuses.containsKey(artifactName)) {
            return statuses.get(artifactName).getExists();
        }
        return false;
    }

    public String cacheArtifact(byte[] blob) {
        String hash;
        block0: {
            hash = DigestUtils.sha256Hex((byte[])blob);
            if (this.isCachedArtifact(hash)) break block0;
            Artifact artifact = Artifact$.MODULE$.newCacheArtifact(hash, new Artifact.InMemory(blob));
            this.addArtifacts((Iterable<Artifact>)Nil$.MODULE$.$colon$colon((Object)artifact));
        }
        return hash;
    }

    public void uploadAllClassFileArtifacts() {
        this.addArtifacts((Iterable<Artifact>)((Iterable)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.classFinders).asScala()).flatMap((Function1 & Serializable & scala.Serializable)x$2 -> x$2.findClasses(), Buffer$.MODULE$.canBuildFrom())));
    }

    public void addClassDir(Path base) {
        if (!Files.isDirectory(base, new LinkOption[0])) {
            return;
        }
        Builder builder = Seq$.MODULE$.newBuilder();
        try (Stream<Path> stream = Files.walk(base, new FileVisitOption[0]);){
            stream.forEach(path -> {
                if (Files.isRegularFile(path, new LinkOption[0]) && ((Object)path).toString().endsWith(".class")) {
                    builder.$plus$eq((Object)Artifact$.MODULE$.newClassArtifact(base.relativize((Path)path), new Artifact.LocalFile((Path)path)));
                    return;
                }
            });
        }
        this.addArtifacts((Iterable<Artifact>)((Iterable)builder.result()));
    }

    public void addArtifacts(Iterable<Artifact> artifacts) {
        if (artifacts.isEmpty()) {
            return;
        }
        Promise promise = Promise$.MODULE$.apply();
        StreamObserver<AddArtifactsResponse> responseHandler = new StreamObserver<AddArtifactsResponse>(null, promise){
            private final Buffer<AddArtifactsResponse.ArtifactSummary> summaries;
            private final Promise promise$1;

            private Buffer<AddArtifactsResponse.ArtifactSummary> summaries() {
                return this.summaries;
            }

            public void onNext(AddArtifactsResponse v) {
                v.getArtifactsList().forEach(summary -> this.summaries().$plus$eq(summary));
            }

            public void onError(Throwable throwable) {
                this.promise$1.failure(throwable);
            }

            public void onCompleted() {
                this.promise$1.success((Object)this.summaries().toSeq());
            }
            {
                this.promise$1 = promise$1;
                this.summaries = (Buffer)Buffer$.MODULE$.empty();
            }
        };
        StreamObserver<AddArtifactsRequest> stream = this.stub.addArtifacts(responseHandler);
        Buffer currentBatch = (Buffer)Buffer$.MODULE$.empty();
        LongRef currentBatchSize = LongRef.create((long)0L);
        artifacts.iterator().foreach((Function1 & Serializable & scala.Serializable)artifact -> {
            ArtifactManager.$anonfun$addArtifacts$2(this, currentBatch, stream, currentBatchSize, artifact);
            return BoxedUnit.UNIT;
        });
        if (currentBatch.nonEmpty()) {
            this.writeBatch$1(currentBatch, stream, currentBatchSize);
        }
        stream.onCompleted();
        SparkThreadUtils$.MODULE$.awaitResult((Awaitable)promise.future(), (Duration)Duration$.MODULE$.Inf());
    }

    private void addBatchedArtifacts(Seq<Artifact> artifacts, StreamObserver<AddArtifactsRequest> stream) {
        AddArtifactsRequest.Builder builder = AddArtifactsRequest.newBuilder().setUserContext(this.clientConfig.userContext()).setClientType(this.clientConfig.userAgent()).setSessionId(this.sessionId);
        artifacts.foreach((Function1 & Serializable & scala.Serializable)artifact -> {
            AddArtifactsRequest.SingleChunkArtifact singleChunkArtifact;
            try (CheckedInputStream in = new CheckedInputStream(artifact.storage().stream(), new CRC32());){
                try {
                    AddArtifactsRequest.ArtifactChunk.Builder data = AddArtifactsRequest.ArtifactChunk.newBuilder().setData(ByteString.readFrom(in)).setCrc(in.getChecksum().getValue());
                    singleChunkArtifact = builder.getBatchBuilder().addArtifactsBuilder().setName(((Object)artifact.path()).toString()).setData(data).build();
                }
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    Option option = NonFatal$.MODULE$.unapply(throwable2);
                    if (!option.isEmpty()) {
                        Throwable e = (Throwable)option.get();
                        stream.onError(e);
                        throw e;
                    }
                    throw throwable;
                }
            }
            return singleChunkArtifact;
        });
        stream.onNext(builder.build());
    }

    private ByteString readNextChunk(InputStream in) {
        byte[] buf = new byte[this.CHUNK_SIZE()];
        int bytesRead = 0;
        int count = 0;
        while (count != -1 && bytesRead < this.CHUNK_SIZE()) {
            count = in.read(buf, bytesRead, this.CHUNK_SIZE() - bytesRead);
            if (count == -1) continue;
            bytesRead += count;
        }
        if (bytesRead == 0) {
            return ByteString.empty();
        }
        return ByteString.copyFrom(buf, 0, bytesRead);
    }

    private void addChunkedArtifact(Artifact artifact, StreamObserver<AddArtifactsRequest> stream) {
        AddArtifactsRequest.Builder builder = AddArtifactsRequest.newBuilder().setUserContext(this.clientConfig.userContext()).setClientType(this.clientConfig.userAgent()).setSessionId(this.sessionId);
        try (CheckedInputStream in = new CheckedInputStream(artifact.storage().stream(), new CRC32());){
            try {
                AddArtifactsRequest.ArtifactChunk.Builder artifactChunkBuilder = AddArtifactsRequest.ArtifactChunk.newBuilder();
                ByteString dataChunk = this.readNextChunk(in);
                builder.getBeginChunkBuilder().setName(((Object)artifact.path()).toString()).setTotalBytes(artifact.size()).setNumChunks(this.getNumChunks$1(artifact.size())).setInitialChunk(artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum().getValue()));
                stream.onNext(builder.build());
                in.getChecksum().reset();
                builder.clearBeginChunk();
                dataChunk = this.readNextChunk(in);
                while (!dataChunk.isEmpty()) {
                    artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum().getValue());
                    builder.setChunk(artifactChunkBuilder.build());
                    stream.onNext(builder.build());
                    in.getChecksum().reset();
                    builder.clearChunk();
                    dataChunk = this.readNextChunk(in);
                }
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable e = (Throwable)option.get();
                    stream.onError(e);
                    throw e;
                }
                throw throwable;
            }
        }
    }

    private static final void addToBatch$1(Artifact dep, long size, Buffer currentBatch$1, LongRef currentBatchSize$1) {
        currentBatch$1.$plus$eq((Object)dep);
        currentBatchSize$1.elem += size;
    }

    private final void writeBatch$1(Buffer currentBatch$1, StreamObserver stream$1, LongRef currentBatchSize$1) {
        this.addBatchedArtifacts((Seq<Artifact>)currentBatch$1.toSeq(), stream$1);
        currentBatch$1.clear();
        currentBatchSize$1.elem = 0L;
    }

    public static final /* synthetic */ void $anonfun$addArtifacts$2(ArtifactManager $this, Buffer currentBatch$1, StreamObserver stream$1, LongRef currentBatchSize$1, Artifact artifact) {
        Artifact.LocalData data = artifact.storage();
        long size = data.size();
        if (size > (long)$this.CHUNK_SIZE()) {
            if (currentBatch$1.nonEmpty()) {
                $this.writeBatch$1(currentBatch$1, stream$1, currentBatchSize$1);
            }
            $this.addChunkedArtifact(artifact, stream$1);
            return;
        }
        if (currentBatchSize$1.elem + size > (long)$this.CHUNK_SIZE()) {
            $this.writeBatch$1(currentBatch$1, stream$1, currentBatchSize$1);
        }
        ArtifactManager.addToBatch$1(artifact, size, currentBatch$1, currentBatchSize$1);
    }

    private final long getNumChunks$1(long size) {
        return (size + (long)(this.CHUNK_SIZE() - 1)) / (long)this.CHUNK_SIZE();
    }

    public ArtifactManager(SparkConnectClient.Configuration clientConfig, String sessionId, CustomSparkConnectBlockingStub bstub, CustomSparkConnectStub stub) {
        this.clientConfig = clientConfig;
        this.sessionId = sessionId;
        this.bstub = bstub;
        this.stub = stub;
        this.CHUNK_SIZE = 32768;
        this.classFinders = new CopyOnWriteArrayList();
    }
}

