package org.apache.spark.api.python;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.security.SocketAuthHelper;
import org.apache.spark.util.RedirectThread;
import org.apache.spark.util.RedirectThread$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.mutable.Queue;
import scala.collection.mutable.WeakHashMap;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;

/* compiled from: PythonWorkerFactory.scala */
@ScalaSignature(bytes = "\u0006\u0001\tUa!\u0002\u001b6\u0001ez\u0004\u0002\u0003'\u0001\u0005\u0003\u0005\u000b\u0011\u0002(\t\u0011e\u0003!\u0011!Q\u0001\niCQ!\u0018\u0001\u0005\u0002yCqa\u0019\u0001C\u0002\u0013%A\r\u0003\u0004i\u0001\u0001\u0006I!\u001a\u0005\bS\u0002\u0011\r\u0011\"\u0003k\u0011\u0019Y\u0007\u0001)A\u0005\u001d\"9A\u000e\u0001b\u0001\n\u0013Q\u0007BB7\u0001A\u0003%a\nC\u0004o\u0001\t\u0007I\u0011B8\t\rY\u0004\u0001\u0015!\u0003q\u0011\u001d9\b\u00011A\u0005\naD\u0011\"a\u0001\u0001\u0001\u0004%I!!\u0002\t\u000f\u0005E\u0001\u0001)Q\u0005s\"I\u0011q\u0006\u0001C\u0002\u0013\u0005\u0011\u0011\u0007\u0005\t\u0003\u007f\u0001\u0001\u0015!\u0003\u00024!I\u0011\u0011\t\u0001A\u0002\u0013%\u00111\t\u0005\n\u0003\u0017\u0002\u0001\u0019!C\u0005\u0003\u001bB\u0001\"!\u0015\u0001A\u0003&\u0011Q\t\u0005\n\u0003+\u0002!\u0019!C\u0005\u0003/B\u0001\"a\u001c\u0001A\u0003%\u0011\u0011\f\u0005\n\u0003g\u0002!\u0019!C\u0005\u0003kB\u0001\"! \u0001A\u0003%\u0011q\u000f\u0005\n\u0003\u0003\u0003\u0001\u0019!C\u0005\u0003\u0007C\u0011\"a#\u0001\u0001\u0004%I!!$\t\u0011\u0005E\u0005\u0001)Q\u0005\u0003\u000bC\u0011\"!&\u0001\u0005\u0004%I!a&\t\u0011\u0005m\u0005\u0001)A\u0005\u00033C\u0001\"a(\u0001\u0005\u0004%IA\u001b\u0005\b\u0003C\u0003\u0001\u0015!\u0003O\u0011\u001d\t\u0019\u000b\u0001C\u0001\u0003KCq!a-\u0001\t\u0003\t)\fC\u0004\u0002<\u0002!I!!*\t\u000f\u0005u\u0006\u0001\"\u0003\u0002@\"9\u00111\u0019\u0001\u0005\n\u0005\u0015\u0007bBAd\u0001\u0011%\u0011\u0011\u001a\u0004\u0007\u0003?\u0004A!!9\t\ru+C\u0011AAu\u0011\u001d\ty/\nC!\u0003\u000bDq!!=\u0001\t\u0013\t)\rC\u0004\u0002t\u0002!I!!2\t\u000f\u0005U\b\u0001\"\u0001\u0002F\"9\u0011q\u001f\u0001\u0005\u0002\u0005e\bbBA��\u0001\u0011\u0005!\u0011A\u0004\b\u0005\u000b)\u0004\u0012\u0002B\u0004\r\u0019!T\u0007#\u0003\u0003\n!1QL\fC\u0001\u0005\u0017A\u0011B!\u0004/\u0005\u0004%\t!a\u0011\t\u0011\t=a\u0006)A\u0005\u0003\u000bB\u0011B!\u0005/\u0005\u0004%\t!a!\t\u0011\tMa\u0006)A\u0005\u0003\u000b\u00131\u0003U=uQ>twk\u001c:lKJ4\u0015m\u0019;pefT!AN\u001c\u0002\rALH\u000f[8o\u0015\tA\u0014(A\u0002ba&T!AO\u001e\u0002\u000bM\u0004\u0018M]6\u000b\u0005qj\u0014AB1qC\u000eDWMC\u0001?\u0003\ry'oZ\n\u0004\u0001\u00013\u0005CA!E\u001b\u0005\u0011%\"A\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0013%AB!osJ+g\r\u0005\u0002H\u00156\t\u0001J\u0003\u0002Js\u0005A\u0011N\u001c;fe:\fG.\u0003\u0002L\u0011\n9Aj\\4hS:<\u0017A\u00039zi\"|g.\u0012=fG\u000e\u0001\u0001CA(W\u001d\t\u0001F\u000b\u0005\u0002R\u00056\t!K\u0003\u0002T\u001b\u00061AH]8pizJ!!\u0016\"\u0002\rA\u0013X\rZ3g\u0013\t9\u0006L\u0001\u0004TiJLgn\u001a\u0006\u0003+\n\u000bq!\u001a8w-\u0006\u00148\u000f\u0005\u0003P7:s\u0015B\u0001/Y\u0005\ri\u0015\r]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007}\u000b'\r\u0005\u0002a\u00015\tQ\u0007C\u0003M\u0007\u0001\u0007a\nC\u0003Z\u0007\u0001\u0007!,A\u0005vg\u0016$\u0015-Z7p]V\tQ\r\u0005\u0002BM&\u0011qM\u0011\u0002\b\u0005>|G.Z1o\u0003))8/\u001a#bK6|g\u000eI\u0001\rI\u0006,Wn\u001c8N_\u0012,H.Z\u000b\u0002\u001d\u0006iA-Y3n_:lu\u000eZ;mK\u0002\nAb^8sW\u0016\u0014Xj\u001c3vY\u0016\fQb^8sW\u0016\u0014Xj\u001c3vY\u0016\u0004\u0013AC1vi\"DU\r\u001c9feV\t\u0001\u000f\u0005\u0002ri6\t!O\u0003\u0002ts\u0005A1/Z2ve&$\u00180\u0003\u0002ve\n\u00012k\\2lKR\fU\u000f\u001e5IK2\u0004XM]\u0001\fCV$\b\u000eS3ma\u0016\u0014\b%\u0001\u0004eC\u0016lwN\\\u000b\u0002sB\u0011!p`\u0007\u0002w*\u0011A0`\u0001\u0005Y\u0006twMC\u0001\u007f\u0003\u0011Q\u0017M^1\n\u0007\u0005\u00051PA\u0004Qe>\u001cWm]:\u0002\u0015\u0011\fW-\\8o?\u0012*\u0017\u000f\u0006\u0003\u0002\b\u00055\u0001cA!\u0002\n%\u0019\u00111\u0002\"\u0003\tUs\u0017\u000e\u001e\u0005\t\u0003\u001fi\u0011\u0011!a\u0001s\u0006\u0019\u0001\u0010J\u0019\u0002\u000f\u0011\fW-\\8oA!:a\"!\u0006\u0002*\u0005-\u0002\u0003BA\f\u0003Ki!!!\u0007\u000b\t\u0005m\u0011QD\u0001\u000bG>t7-\u001e:sK:$(\u0002BA\u0010\u0003C\t!\"\u00198o_R\fG/[8o\u0015\t\t\u0019#A\u0003kCZ\f\u00070\u0003\u0003\u0002(\u0005e!!C$vCJ$W\r\u001a\"z\u0003\u00151\u0018\r\\;fC\t\ti#\u0001\u0003tK24\u0017A\u00033bK6|g\u000eS8tiV\u0011\u00111\u0007\t\u0005\u0003k\tY$\u0004\u0002\u00028)\u0019\u0011\u0011H?\u0002\u00079,G/\u0003\u0003\u0002>\u0005]\"aC%oKR\fE\r\u001a:fgN\f1\u0002Z1f[>t\u0007j\\:uA\u0005QA-Y3n_:\u0004vN\u001d;\u0016\u0005\u0005\u0015\u0003cA!\u0002H%\u0019\u0011\u0011\n\"\u0003\u0007%sG/\u0001\beC\u0016lwN\u001c)peR|F%Z9\u0015\t\u0005\u001d\u0011q\n\u0005\n\u0003\u001f\u0011\u0012\u0011!a\u0001\u0003\u000b\n1\u0002Z1f[>t\u0007k\u001c:uA!:1#!\u0006\u0002*\u0005-\u0012!\u00043bK6|gnV8sW\u0016\u00148/\u0006\u0002\u0002ZAA\u00111LA3\u0003S\n)%\u0004\u0002\u0002^)!\u0011qLA1\u0003\u001diW\u000f^1cY\u0016T1!a\u0019C\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003O\niFA\u0006XK\u0006\\\u0007*Y:i\u001b\u0006\u0004\b\u0003BA\u001b\u0003WJA!!\u001c\u00028\t11k\\2lKR\fa\u0002Z1f[>twk\u001c:lKJ\u001c\b\u0005K\u0004\u0016\u0003+\tI#a\u000b\u0002\u0017%$G.Z,pe.,'o]\u000b\u0003\u0003o\u0002b!a\u0017\u0002z\u0005%\u0014\u0002BA>\u0003;\u0012Q!U;fk\u0016\fA\"\u001b3mK^{'o[3sg\u0002BsaFA\u000b\u0003S\tY#\u0001\bmCN$\u0018i\u0019;jm&$\u0018PT:\u0016\u0005\u0005\u0015\u0005cA!\u0002\b&\u0019\u0011\u0011\u0012\"\u0003\t1{gnZ\u0001\u0013Y\u0006\u001cH/Q2uSZLG/\u001f(t?\u0012*\u0017\u000f\u0006\u0003\u0002\b\u0005=\u0005\"CA\b3\u0005\u0005\t\u0019AAC\u0003=a\u0017m\u001d;BGRLg/\u001b;z\u001dN\u0004\u0003f\u0002\u000e\u0002\u0016\u0005%\u00121F\u0001\u000eg&l\u0007\u000f\\3X_J\\WM]:\u0016\u0005\u0005e\u0005cBA.\u0003K\nI'_\u0001\u000fg&l\u0007\u000f\\3X_J\\WM]:!Q\u001da\u0012QCA\u0015\u0003W\t!\u0002]=uQ>t\u0007+\u0019;i\u0003-\u0001\u0018\u0010\u001e5p]B\u000bG\u000f\u001b\u0011\u0002\r\r\u0014X-\u0019;f)\t\t9\u000bE\u0004B\u0003S\u000bI'!,\n\u0007\u0005-&I\u0001\u0004UkBdWM\r\t\u0006\u0003\u0006=\u0016QI\u0005\u0004\u0003c\u0013%AB(qi&|g.A\u000bde\u0016\fG/Z*ue\u0016\fW.\u001b8h/>\u00148.\u001a:\u0015\t\u0005\u001d\u0016q\u0017\u0005\u0007\u0003s\u0003\u0003\u0019\u0001(\u0002+M$(/Z1nS:<wk\u001c:lKJlu\u000eZ;mK\u0006\u00192M]3bi\u0016$\u0006N]8vO\"$\u0015-Z7p]\u0006\u00112M]3bi\u0016\u001c\u0016.\u001c9mK^{'o[3s)\u0011\t9+!1\t\u000b1\u0014\u0003\u0019\u0001(\u0002\u0017M$\u0018M\u001d;EC\u0016lwN\u001c\u000b\u0003\u0003\u000f\tqC]3eSJ,7\r^*ue\u0016\fWn\u001d+p'R$WM\u001d:\u0015\r\u0005\u001d\u00111ZAn\u0011\u001d\ti\r\na\u0001\u0003\u001f\faa\u001d;e_V$\b\u0003BAi\u0003/l!!a5\u000b\u0007\u0005UW0\u0001\u0002j_&!\u0011\u0011\\Aj\u0005-Ie\u000e];u'R\u0014X-Y7\t\u000f\u0005uG\u00051\u0001\u0002P\u000611\u000f\u001e3feJ\u0014Q\"T8oSR|'\u000f\u00165sK\u0006$7cA\u0013\u0002dB\u0019!0!:\n\u0007\u0005\u001d8P\u0001\u0004UQJ,\u0017\r\u001a\u000b\u0003\u0003W\u00042!!<&\u001b\u0005\u0001\u0011a\u0001:v]\u0006\u00112\r\\3b]V\u0004\u0018\n\u001a7f/>\u00148.\u001a:t\u0003)\u0019Ho\u001c9EC\u0016lwN\\\u0001\u0005gR|\u0007/\u0001\u0006ti>\u0004xk\u001c:lKJ$B!a\u0002\u0002|\"9\u0011Q`\u0016A\u0002\u0005%\u0014AB<pe.,'/A\u0007sK2,\u0017m]3X_J\\WM\u001d\u000b\u0005\u0003\u000f\u0011\u0019\u0001C\u0004\u0002~2\u0002\r!!\u001b\u0002'AKH\u000f[8o/>\u00148.\u001a:GC\u000e$xN]=\u0011\u0005\u0001t3C\u0001\u0018A)\t\u00119!A\fQ%>\u001bUiU*`/\u0006KEk\u0018+J\u001b\u0016{U\u000bV0N'\u0006A\u0002KU(D\u000bN\u001bvlV!J)~#\u0016*T#P+R{Vj\u0015\u0011\u0002-%#E*R0X\u001fJ[UIU0U\u00136+u*\u0016+`\u001dN\u000bq#\u0013#M\u000b~;vJU&F%~#\u0016*T#P+R{fj\u0015\u0011")
/* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory.class */
public class PythonWorkerFactory implements Logging {
    public final String org$apache$spark$api$python$PythonWorkerFactory$$pythonExec;
    private final Map<String, String> envVars;
    private final boolean useDaemon;
    private final String daemonModule;
    private final String workerModule;
    private final SocketAuthHelper authHelper;

    @GuardedBy("self")
    private Process daemon;
    private final InetAddress daemonHost;

    @GuardedBy("self")
    private int daemonPort;

    @GuardedBy("self")
    private final WeakHashMap<Socket, Object> daemonWorkers;

    @GuardedBy("self")
    private final Queue<Socket> idleWorkers;

    @GuardedBy("self")
    private long org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs;

    @GuardedBy("self")
    private final WeakHashMap<Socket, Process> simpleWorkers;
    private final String pythonPath;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: PythonWorkerFactory.scala */
    /* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory$MonitorThread.class */
    private class MonitorThread extends Thread {
        public final /* synthetic */ PythonWorkerFactory $outer;

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.spark.api.python.PythonWorkerFactory] */
        /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v5, types: [int] */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                ?? org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer = org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer();
                synchronized (org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer) {
                    org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer = (PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS() > (System.nanoTime() - org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs()) ? 1 : (PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS() == (System.nanoTime() - org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs()) ? 0 : -1));
                    if (org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer < 0) {
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(System.nanoTime());
                    }
                }
                Thread.sleep(10000L);
            }
        }

        public /* synthetic */ PythonWorkerFactory org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public MonitorThread(PythonWorkerFactory pythonWorkerFactory) {
            super(new StringBuilder(24).append("Idle Worker Monitor for ").append(pythonWorkerFactory.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString());
            if (pythonWorkerFactory == null) {
                throw null;
            }
            this.$outer = pythonWorkerFactory;
            setDaemon(true);
        }
    }

    public static long IDLE_WORKER_TIMEOUT_NS() {
        return PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS();
    }

    public static int PROCESS_WAIT_TIMEOUT_MS() {
        return PythonWorkerFactory$.MODULE$.PROCESS_WAIT_TIMEOUT_MS();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private boolean useDaemon() {
        return this.useDaemon;
    }

    private String daemonModule() {
        return this.daemonModule;
    }

    private String workerModule() {
        return this.workerModule;
    }

    private SocketAuthHelper authHelper() {
        return this.authHelper;
    }

    private Process daemon() {
        return this.daemon;
    }

    private void daemon_$eq(Process process) {
        this.daemon = process;
    }

    public InetAddress daemonHost() {
        return this.daemonHost;
    }

    private int daemonPort() {
        return this.daemonPort;
    }

    private void daemonPort_$eq(int i) {
        this.daemonPort = i;
    }

    private WeakHashMap<Socket, Object> daemonWorkers() {
        return this.daemonWorkers;
    }

    private Queue<Socket> idleWorkers() {
        return this.idleWorkers;
    }

    public long org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs() {
        return this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs;
    }

    public void org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(long j) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs = j;
    }

    private WeakHashMap<Socket, Process> simpleWorkers() {
        return this.simpleWorkers;
    }

    private String pythonPath() {
        return this.pythonPath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v16, types: [scala.Tuple2<java.net.Socket, scala.Option<java.lang.Object>>, scala.Tuple2] */
    /* JADX WARN: Type inference failed for: r0v4 */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    public Tuple2<Socket, Option<Object>> create() {
        if (!useDaemon()) {
            return createSimpleWorker(workerModule());
        }
        ?? r0 = this;
        synchronized (r0) {
            if (!idleWorkers().nonEmpty()) {
                return createThroughDaemon();
            }
            Socket socket = (Socket) idleWorkers().dequeue();
            r0 = new Tuple2(socket, daemonWorkers().get(socket));
            return r0;
        }
    }

    public Tuple2<Socket, Option<Object>> createStreamingWorker(String str) {
        return createSimpleWorker(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Tuple2<Socket, Option<Object>> createThroughDaemon() {
        Tuple2<Socket, Option<Object>> liftedTree1$1;
        synchronized (this) {
            startDaemon();
            liftedTree1$1 = liftedTree1$1();
        }
        return liftedTree1$1;
    }

    /* JADX WARN: Code restructure failed: missing block: B:41:0x0061, code lost:
    
        if (r0.equals(org.sparkproject.jetty.servlet.ServletHandler.__DEFAULT_SERVLET) == false) goto L10;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private scala.Tuple2<java.net.Socket, scala.Option<java.lang.Object>> createSimpleWorker(java.lang.String r8) {
        /*
            Method dump skipped, instructions count: 454
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(java.lang.String):scala.Tuple2");
    }

    /* JADX WARN: Code restructure failed: missing block: B:40:0x005d, code lost:
    
        if (r0.equals(org.sparkproject.jetty.servlet.ServletHandler.__DEFAULT_SERVLET) == false) goto L12;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private synchronized void startDaemon() {
        /*
            Method dump skipped, instructions count: 669
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.spark.api.python.PythonWorkerFactory.startDaemon():void");
    }

    private void redirectStreamsToStderr(InputStream inputStream, InputStream inputStream2) {
        try {
            new RedirectThread(inputStream, System.err, new StringBuilder(18).append("stdout reader for ").append(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
            new RedirectThread(inputStream2, System.err, new StringBuilder(18).append("stderr reader for ").append(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
        } catch (Exception e) {
            logError(() -> {
                return "Exception in redirecting streams";
            }, e);
        }
    }

    public void org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers() {
        while (idleWorkers().nonEmpty()) {
            try {
                ((Socket) idleWorkers().dequeue()).close();
            } catch (Exception e) {
                logWarning(() -> {
                    return "Failed to close worker socket";
                }, e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void stopDaemon() {
        synchronized (this) {
            if (useDaemon()) {
                org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                if (daemon() != null) {
                    daemon().destroy();
                }
                daemon_$eq(null);
                daemonPort_$eq(0);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                simpleWorkers().mapValues(process -> {
                    process.destroy();
                    return BoxedUnit.UNIT;
                });
            }
        }
    }

    public void stop() {
        stopDaemon();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void stopWorker(Socket socket) {
        synchronized (this) {
            if (!useDaemon()) {
                simpleWorkers().get(socket).foreach(process -> {
                    process.destroy();
                    return BoxedUnit.UNIT;
                });
            } else if (daemon() != null) {
                daemonWorkers().get(socket).foreach(i -> {
                    DataOutputStream dataOutputStream = new DataOutputStream(this.daemon().getOutputStream());
                    dataOutputStream.writeInt(i);
                    dataOutputStream.flush();
                    this.daemon().getOutputStream().flush();
                });
            }
        }
        socket.close();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void releaseWorker(Socket socket) {
        if (useDaemon()) {
            synchronized (this) {
                org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(System.nanoTime());
                idleWorkers().enqueue(Predef$.MODULE$.wrapRefArray(new Socket[]{socket}));
            }
        } else {
            try {
                socket.close();
            } catch (Exception e) {
                logWarning(() -> {
                    return "Failed to close worker socket";
                }, e);
            }
        }
    }

    private final Tuple2 createSocket$1() {
        Socket socket = new Socket(daemonHost(), daemonPort());
        int readInt = new DataInputStream(socket.getInputStream()).readInt();
        if (readInt < 0) {
            throw new IllegalStateException(new StringBuilder(48).append("Python daemon failed to launch worker with code ").append(readInt).toString());
        }
        authHelper().authToServer(socket);
        daemonWorkers().put(socket, BoxesRunTime.boxToInteger(readInt));
        return new Tuple2(socket, new Some(BoxesRunTime.boxToInteger(readInt)));
    }

    private final /* synthetic */ Tuple2 liftedTree1$1() {
        try {
            return createSocket$1();
        } catch (SocketException e) {
            logWarning(() -> {
                return "Failed to open socket to Python daemon:";
            }, e);
            logWarning(() -> {
                return "Assuming that daemon unexpectedly quit, attempting to restart";
            });
            stopDaemon();
            startDaemon();
            return createSocket$1();
        }
    }

    public PythonWorkerFactory(String str, Map<String, String> map) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec = str;
        this.envVars = map;
        Logging.$init$(this);
        this.useDaemon = !System.getProperty("os.name").startsWith("Windows") && BoxesRunTime.unboxToBoolean(SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_USE_DAEMON()));
        this.daemonModule = (String) ((Option) SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_DAEMON_MODULE())).map(str2 -> {
            this.logInfo(() -> {
                return new StringBuilder(186).append("Python daemon module in PySpark is set to [").append(str2).append("] in '").append(Python$.MODULE$.PYTHON_DAEMON_MODULE().key()).append("', ").append("using this to start the daemon up. Note that this configuration only has an effect when ").append("'").append(Python$.MODULE$.PYTHON_USE_DAEMON().key()).append("' is enabled and the platform is not Windows.").toString();
            });
            return str2;
        }).getOrElse(() -> {
            return "pyspark.daemon";
        });
        this.workerModule = (String) ((Option) SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_WORKER_MODULE())).map(str3 -> {
            this.logInfo(() -> {
                return new StringBuilder(182).append("Python worker module in PySpark is set to [").append(str3).append("] in '").append(Python$.MODULE$.PYTHON_WORKER_MODULE().key()).append("', ").append("using this to start the worker up. Note that this configuration only has an effect when ").append("'").append(Python$.MODULE$.PYTHON_USE_DAEMON().key()).append("' is disabled or the platform is Windows.").toString();
            });
            return str3;
        }).getOrElse(() -> {
            return "pyspark.worker";
        });
        this.authHelper = new SocketAuthHelper(SparkEnv$.MODULE$.get().conf());
        this.daemon = null;
        this.daemonHost = InetAddress.getLoopbackAddress();
        this.daemonPort = 0;
        this.daemonWorkers = new WeakHashMap<>();
        this.idleWorkers = new Queue<>();
        this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs = 0L;
        new MonitorThread(this).start();
        this.simpleWorkers = new WeakHashMap<>();
        this.pythonPath = PythonUtils$.MODULE$.mergePythonPaths(Predef$.MODULE$.wrapRefArray(new String[]{PythonUtils$.MODULE$.sparkPythonPath(), (String) map.getOrElse("PYTHONPATH", () -> {
            return "";
        }), (String) package$.MODULE$.env().getOrElse("PYTHONPATH", () -> {
            return "";
        })}));
    }
}
