package org.apache.spark.api.python;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.List;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkException;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.security.SocketAuthHelper;
import org.apache.spark.util.RedirectThread;
import org.apache.spark.util.RedirectThread$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.WeakHashMap;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;

/* compiled from: PythonWorkerFactory.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mh!B\u001a5\u0001ar\u0004\u0002C&\u0001\u0005\u0003\u0005\u000b\u0011B'\t\u0011a\u0003!\u0011!Q\u0001\neCQ\u0001\u0018\u0001\u0005\u0002uCqA\u0019\u0001C\u0002\u0013%1\r\u0003\u0004h\u0001\u0001\u0006I\u0001\u001a\u0005\bQ\u0002\u0011\r\u0011\"\u0003j\u0011\u0019Q\u0007\u0001)A\u0005\u001b\"91\u000e\u0001b\u0001\n\u0013I\u0007B\u00027\u0001A\u0003%Q\nC\u0004n\u0001\t\u0007I\u0011\u00028\t\rU\u0004\u0001\u0015!\u0003p\u0011\u001d1\b\u00011A\u0005\n]D\u0011\"!\u0001\u0001\u0001\u0004%I!a\u0001\t\u000f\u0005=\u0001\u0001)Q\u0005q\"I\u0011Q\u0006\u0001C\u0002\u0013\u0005\u0011q\u0006\u0005\t\u0003{\u0001\u0001\u0015!\u0003\u00022!I\u0011q\b\u0001A\u0002\u0013%\u0011\u0011\t\u0005\n\u0003\u0013\u0002\u0001\u0019!C\u0005\u0003\u0017B\u0001\"a\u0014\u0001A\u0003&\u00111\t\u0005\n\u0003'\u0002!\u0019!C\u0005\u0003+B\u0001\"!\u001c\u0001A\u0003%\u0011q\u000b\u0005\n\u0003c\u0002!\u0019!C\u0005\u0003gB\u0001\"a\u001f\u0001A\u0003%\u0011Q\u000f\u0005\n\u0003\u007f\u0002\u0001\u0019!C\u0005\u0003\u0003C\u0011\"!#\u0001\u0001\u0004%I!a#\t\u0011\u0005=\u0005\u0001)Q\u0005\u0003\u0007C\u0011\"a%\u0001\u0005\u0004%I!!&\t\u0011\u0005e\u0005\u0001)A\u0005\u0003/C\u0001\"!(\u0001\u0005\u0004%I!\u001b\u0005\b\u0003?\u0003\u0001\u0015!\u0003N\u0011\u001d\t\t\u000b\u0001C\u0001\u0003GCq!!*\u0001\t\u0013\t\u0019\u000bC\u0004\u0002(\u0002!I!a)\t\u000f\u0005%\u0006\u0001\"\u0003\u0002,\"9\u0011Q\u0016\u0001\u0005\n\u0005=fABAc\u0001\u0011\t9\r\u0003\u0004]I\u0011\u0005\u0011q\u001a\u0005\b\u0003+$C\u0011IAV\u0011\u001d\t9\u000e\u0001C\u0005\u0003WCq!!7\u0001\t\u0013\tY\u000bC\u0004\u0002\\\u0002!\t!a+\t\u000f\u0005u\u0007\u0001\"\u0001\u0002`\"9\u0011Q\u001d\u0001\u0005\u0002\u0005\u001dxaBAvi!%\u0011Q\u001e\u0004\u0007gQBI!a<\t\rqkC\u0011AAy\u0011%\t\u00190\fb\u0001\n\u0003\t\t\u0005\u0003\u0005\u0002v6\u0002\u000b\u0011BA\"\u0011%\t90\fb\u0001\n\u0003\t\t\t\u0003\u0005\u0002z6\u0002\u000b\u0011BAB\u0005M\u0001\u0016\u0010\u001e5p]^{'o[3s\r\u0006\u001cGo\u001c:z\u0015\t)d'\u0001\u0004qsRDwN\u001c\u0006\u0003oa\n1!\u00199j\u0015\tI$(A\u0003ta\u0006\u00148N\u0003\u0002<y\u00051\u0011\r]1dQ\u0016T\u0011!P\u0001\u0004_J<7c\u0001\u0001@\u000bB\u0011\u0001iQ\u0007\u0002\u0003*\t!)A\u0003tG\u0006d\u0017-\u0003\u0002E\u0003\n1\u0011I\\=SK\u001a\u0004\"AR%\u000e\u0003\u001dS!\u0001\u0013\u001d\u0002\u0011%tG/\u001a:oC2L!AS$\u0003\u000f1{wmZ5oO\u0006Q\u0001/\u001f;i_:,\u00050Z2\u0004\u0001A\u0011a*\u0016\b\u0003\u001fN\u0003\"\u0001U!\u000e\u0003ES!A\u0015'\u0002\rq\u0012xn\u001c;?\u0013\t!\u0016)\u0001\u0004Qe\u0016$WMZ\u0005\u0003-^\u0013aa\u0015;sS:<'B\u0001+B\u0003\u001d)gN\u001e,beN\u0004BA\u0014.N\u001b&\u00111l\u0016\u0002\u0004\u001b\u0006\u0004\u0018A\u0002\u001fj]&$h\bF\u0002_A\u0006\u0004\"a\u0018\u0001\u000e\u0003QBQaS\u0002A\u00025CQ\u0001W\u0002A\u0002e\u000b\u0011\"^:f\t\u0006,Wn\u001c8\u0016\u0003\u0011\u0004\"\u0001Q3\n\u0005\u0019\f%a\u0002\"p_2,\u0017M\\\u0001\u000bkN,G)Y3n_:\u0004\u0013\u0001\u00043bK6|g.T8ek2,W#A'\u0002\u001b\u0011\fW-\\8o\u001b>$W\u000f\\3!\u000319xN]6fe6{G-\u001e7f\u000359xN]6fe6{G-\u001e7fA\u0005Q\u0011-\u001e;i\u0011\u0016d\u0007/\u001a:\u0016\u0003=\u0004\"\u0001]:\u000e\u0003ET!A\u001d\u001d\u0002\u0011M,7-\u001e:jifL!\u0001^9\u0003!M{7m[3u\u0003V$\b\u000eS3ma\u0016\u0014\u0018aC1vi\"DU\r\u001c9fe\u0002\na\u0001Z1f[>tW#\u0001=\u0011\u0005etX\"\u0001>\u000b\u0005md\u0018\u0001\u00027b]\u001eT\u0011!`\u0001\u0005U\u00064\u0018-\u0003\u0002��u\n9\u0001K]8dKN\u001c\u0018A\u00033bK6|gn\u0018\u0013fcR!\u0011QAA\u0006!\r\u0001\u0015qA\u0005\u0004\u0003\u0013\t%\u0001B+oSRD\u0001\"!\u0004\u000e\u0003\u0003\u0005\r\u0001_\u0001\u0004q\u0012\n\u0014a\u00023bK6|g\u000e\t\u0015\b\u001d\u0005M\u0011qEA\u0015!\u0011\t)\"a\t\u000e\u0005\u0005]!\u0002BA\r\u00037\t!bY8oGV\u0014(/\u001a8u\u0015\u0011\ti\"a\b\u0002\u0015\u0005tgn\u001c;bi&|gN\u0003\u0002\u0002\"\u0005)!.\u0019<bq&!\u0011QEA\f\u0005%9U/\u0019:eK\u0012\u0014\u00150A\u0003wC2,X-\t\u0002\u0002,\u0005!1/\u001a7g\u0003)!\u0017-Z7p]\"{7\u000f^\u000b\u0003\u0003c\u0001B!a\r\u0002:5\u0011\u0011Q\u0007\u0006\u0004\u0003oa\u0018a\u00018fi&!\u00111HA\u001b\u0005-Ie.\u001a;BI\u0012\u0014Xm]:\u0002\u0017\u0011\fW-\\8o\u0011>\u001cH\u000fI\u0001\u000bI\u0006,Wn\u001c8Q_J$XCAA\"!\r\u0001\u0015QI\u0005\u0004\u0003\u000f\n%aA%oi\u0006qA-Y3n_:\u0004vN\u001d;`I\u0015\fH\u0003BA\u0003\u0003\u001bB\u0011\"!\u0004\u0013\u0003\u0003\u0005\r!a\u0011\u0002\u0017\u0011\fW-\\8o!>\u0014H\u000f\t\u0015\b'\u0005M\u0011qEA\u0015\u00035!\u0017-Z7p]^{'o[3sgV\u0011\u0011q\u000b\t\t\u00033\n\u0019'a\u001a\u0002D5\u0011\u00111\f\u0006\u0005\u0003;\ny&A\u0004nkR\f'\r\\3\u000b\u0007\u0005\u0005\u0014)\u0001\u0006d_2dWm\u0019;j_:LA!!\u001a\u0002\\\tYq+Z1l\u0011\u0006\u001c\b.T1q!\u0011\t\u0019$!\u001b\n\t\u0005-\u0014Q\u0007\u0002\u0007'>\u001c7.\u001a;\u0002\u001d\u0011\fW-\\8o/>\u00148.\u001a:tA!:Q#a\u0005\u0002(\u0005%\u0012aC5eY\u0016<vN]6feN,\"!!\u001e\u0011\r\u0005e\u0013qOA4\u0013\u0011\tI(a\u0017\u0003\u000bE+X-^3\u0002\u0019%$G.Z,pe.,'o\u001d\u0011)\u000f]\t\u0019\"a\n\u0002*\u0005qA.Y:u\u0003\u000e$\u0018N^5us:\u001bXCAAB!\r\u0001\u0015QQ\u0005\u0004\u0003\u000f\u000b%\u0001\u0002'p]\u001e\f!\u0003\\1ti\u0006\u001bG/\u001b<jift5o\u0018\u0013fcR!\u0011QAAG\u0011%\ti!GA\u0001\u0002\u0004\t\u0019)A\bmCN$\u0018i\u0019;jm&$\u0018PT:!Q\u001dQ\u00121CA\u0014\u0003S\tQb]5na2,wk\u001c:lKJ\u001cXCAAL!\u001d\tI&a\u0019\u0002ha\fab]5na2,wk\u001c:lKJ\u001c\b\u0005K\u0004\u001d\u0003'\t9#!\u000b\u0002\u0015ALH\u000f[8o!\u0006$\b.A\u0006qsRDwN\u001c)bi\"\u0004\u0013AB2sK\u0006$X\r\u0006\u0002\u0002h\u0005\u00192M]3bi\u0016$\u0006N]8vO\"$\u0015-Z7p]\u0006\u00112M]3bi\u0016\u001c\u0016.\u001c9mK^{'o[3s\u0003-\u0019H/\u0019:u\t\u0006,Wn\u001c8\u0015\u0005\u0005\u0015\u0011a\u0006:fI&\u0014Xm\u0019;TiJ,\u0017-\\:U_N#H-\u001a:s)\u0019\t)!!-\u0002B\"9\u00111W\u0012A\u0002\u0005U\u0016AB:uI>,H\u000f\u0005\u0003\u00028\u0006uVBAA]\u0015\r\tY\f`\u0001\u0003S>LA!a0\u0002:\nY\u0011J\u001c9viN#(/Z1n\u0011\u001d\t\u0019m\ta\u0001\u0003k\u000baa\u001d;eKJ\u0014(!D'p]&$xN\u001d+ie\u0016\fGmE\u0002%\u0003\u0013\u00042!_Af\u0013\r\tiM\u001f\u0002\u0007)\"\u0014X-\u00193\u0015\u0005\u0005E\u0007cAAjI5\t\u0001!A\u0002sk:\f!c\u00197fC:,\b/\u00133mK^{'o[3sg\u0006Q1\u000f^8q\t\u0006,Wn\u001c8\u0002\tM$x\u000e]\u0001\u000bgR|\u0007oV8sW\u0016\u0014H\u0003BA\u0003\u0003CDq!a9+\u0001\u0004\t9'\u0001\u0004x_J\\WM]\u0001\u000ee\u0016dW-Y:f/>\u00148.\u001a:\u0015\t\u0005\u0015\u0011\u0011\u001e\u0005\b\u0003G\\\u0003\u0019AA4\u0003M\u0001\u0016\u0010\u001e5p]^{'o[3s\r\u0006\u001cGo\u001c:z!\tyVf\u0005\u0002.\u007fQ\u0011\u0011Q^\u0001\u0018!J{5)R*T?^\u000b\u0015\nV0U\u00136+u*\u0016+`\u001bN\u000b\u0001\u0004\u0015*P\u0007\u0016\u001b6kX,B\u0013R{F+S'F\u001fV#v,T*!\u0003YIE\tT#`/>\u00136*\u0012*`)&kUiT+U?:\u001b\u0016aF%E\u0019\u0016{vk\u0014*L\u000bJ{F+S'F\u001fV#vLT*!\u0001")
/* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory.class */
public class PythonWorkerFactory implements Logging {
    public final String org$apache$spark$api$python$PythonWorkerFactory$$pythonExec;
    private final Map<String, String> envVars;
    private final boolean useDaemon;
    private final String daemonModule;
    private final String workerModule;
    private final SocketAuthHelper authHelper;

    @GuardedBy("self")
    private Process daemon;
    private final InetAddress daemonHost;

    @GuardedBy("self")
    private int daemonPort;

    @GuardedBy("self")
    private final WeakHashMap<Socket, Object> daemonWorkers;

    @GuardedBy("self")
    private final Queue<Socket> idleWorkers;

    @GuardedBy("self")
    private long org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs;

    @GuardedBy("self")
    private final WeakHashMap<Socket, Process> simpleWorkers;
    private final String pythonPath;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: PythonWorkerFactory.scala */
    /* loaded from: input_file:org/apache/spark/api/python/PythonWorkerFactory$MonitorThread.class */
    public class MonitorThread extends Thread {
        public final /* synthetic */ PythonWorkerFactory $outer;

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.spark.api.python.PythonWorkerFactory] */
        /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v5, types: [int] */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                ?? org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer = org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer();
                synchronized (org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer) {
                    org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer = (PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS() > (System.nanoTime() - org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs()) ? 1 : (PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS() == (System.nanoTime() - org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs()) ? 0 : -1));
                    if (org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer < 0) {
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(System.nanoTime());
                    }
                }
                Thread.sleep(10000L);
            }
        }

        public /* synthetic */ PythonWorkerFactory org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public MonitorThread(PythonWorkerFactory pythonWorkerFactory) {
            super(new StringBuilder(24).append("Idle Worker Monitor for ").append(pythonWorkerFactory.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString());
            if (pythonWorkerFactory == null) {
                throw null;
            }
            this.$outer = pythonWorkerFactory;
            setDaemon(true);
        }
    }

    public static long IDLE_WORKER_TIMEOUT_NS() {
        return PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_NS();
    }

    public static int PROCESS_WAIT_TIMEOUT_MS() {
        return PythonWorkerFactory$.MODULE$.PROCESS_WAIT_TIMEOUT_MS();
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        return logName();
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        return log();
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        return isTraceEnabled();
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        initializeLogIfNecessary(z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return initializeLogIfNecessary(z, z2);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        return initializeLogIfNecessary$default$2();
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeForcefully(boolean z, boolean z2) {
        initializeForcefully(z, z2);
    }

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private boolean useDaemon() {
        return this.useDaemon;
    }

    private String daemonModule() {
        return this.daemonModule;
    }

    private String workerModule() {
        return this.workerModule;
    }

    private SocketAuthHelper authHelper() {
        return this.authHelper;
    }

    private Process daemon() {
        return this.daemon;
    }

    private void daemon_$eq(Process process) {
        this.daemon = process;
    }

    public InetAddress daemonHost() {
        return this.daemonHost;
    }

    private int daemonPort() {
        return this.daemonPort;
    }

    private void daemonPort_$eq(int i) {
        this.daemonPort = i;
    }

    private WeakHashMap<Socket, Object> daemonWorkers() {
        return this.daemonWorkers;
    }

    private Queue<Socket> idleWorkers() {
        return this.idleWorkers;
    }

    public long org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs() {
        return this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs;
    }

    public void org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(long j) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs = j;
    }

    private WeakHashMap<Socket, Process> simpleWorkers() {
        return this.simpleWorkers;
    }

    private String pythonPath() {
        return this.pythonPath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [java.net.Socket] */
    /* JADX WARN: Type inference failed for: r0v4 */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    public Socket create() {
        if (!useDaemon()) {
            return createSimpleWorker();
        }
        ?? r0 = this;
        synchronized (r0) {
            if (!idleWorkers().nonEmpty()) {
                return createThroughDaemon();
            }
            r0 = (Socket) idleWorkers().dequeue();
            return r0;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Socket createThroughDaemon() {
        Socket liftedTree1$1;
        synchronized (this) {
            startDaemon();
            liftedTree1$1 = liftedTree1$1();
        }
        return liftedTree1$1;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Socket createSimpleWorker() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte())));
            ProcessBuilder processBuilder = new ProcessBuilder((List<String>) Arrays.asList(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, "-m", workerModule()));
            java.util.Map<String, String> environment = processBuilder.environment();
            environment.putAll((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.envVars).asJava());
            environment.put("PYTHONPATH", pythonPath());
            environment.put("PYTHONUNBUFFERED", "YES");
            environment.put("PYTHON_WORKER_FACTORY_PORT", BoxesRunTime.boxToInteger(serverSocket.getLocalPort()).toString());
            environment.put("PYTHON_WORKER_FACTORY_SECRET", authHelper().secret());
            Process start = processBuilder.start();
            redirectStreamsToStderr(start.getInputStream(), start.getErrorStream());
            serverSocket.setSoTimeout(10000);
            try {
                Socket accept = serverSocket.accept();
                authHelper().authClient(accept);
                synchronized (this) {
                    simpleWorkers().put(accept, start);
                }
                if (serverSocket != null) {
                    serverSocket.close();
                }
                return accept;
            } catch (Exception e) {
                throw new SparkException("Python worker failed to connect back.", e);
            }
        } catch (Throwable th) {
            if (serverSocket != null) {
                serverSocket.close();
            }
            throw th;
        }
    }

    private synchronized void startDaemon() {
        if (daemon() != null) {
            return;
        }
        try {
            List asList = Arrays.asList(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, "-m", daemonModule());
            ProcessBuilder processBuilder = new ProcessBuilder((List<String>) asList);
            java.util.Map<String, String> environment = processBuilder.environment();
            environment.putAll((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.envVars).asJava());
            environment.put("PYTHONPATH", pythonPath());
            environment.put("PYTHON_WORKER_FACTORY_SECRET", authHelper().secret());
            environment.put("PYTHONUNBUFFERED", "YES");
            daemon_$eq(processBuilder.start());
            DataInputStream dataInputStream = new DataInputStream(daemon().getInputStream());
            try {
                daemonPort_$eq(dataInputStream.readInt());
                if (daemonPort() >= 1 && daemonPort() <= 65535) {
                    redirectStreamsToStderr(dataInputStream, daemon().getErrorStream());
                    return;
                }
                String daemonModule = daemonModule();
                Integer boxToInteger = BoxesRunTime.boxToInteger(daemonPort());
                int daemonPort = daemonPort();
                throw new SparkException(new StringOps(Predef$.MODULE$.augmentString(new StringOps("\n            |Bad data in %s's standard output. Invalid port number:\n            |  %s (0x%08x)\n            |Python command to execute the daemon was:\n            |  %s\n            |Check that you don't have any unexpected modules or libraries in\n            |your PYTHONPATH:\n            |  %s\n            |Also, check if you have a sitecustomize.py module in your python path,\n            |or in your python installation, that is printing to standard output").format(Predef$.MODULE$.genericWrapArray(new Object[]{daemonModule, boxToInteger, BoxesRunTime.boxToInteger(daemonPort), ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(asList).asScala()).mkString(" "), pythonPath()})))).stripMargin());
            } catch (EOFException e) {
                if (!daemon().isAlive()) {
                    throw new SparkException(new StringBuilder(94).append("EOFException occurred while reading the port number from ").append(daemonModule()).append("'s").append(" stdout and terminated with code: ").append(daemon().exitValue()).append(".").toString());
                }
                throw new SparkException(new StringBuilder(66).append("EOFException occurred while reading the port number ").append("from ").append(daemonModule()).append("'s stdout").toString());
            }
        } catch (Exception e2) {
            String str = (String) Option$.MODULE$.apply(daemon()).flatMap(process -> {
                return Utils$.MODULE$.getStderr(process, PythonWorkerFactory$.MODULE$.PROCESS_WAIT_TIMEOUT_MS());
            }).getOrElse(() -> {
                return "";
            });
            stopDaemon();
            if (str != null ? str.equals("") : "" == 0) {
                throw e2;
            }
            SparkException sparkException = new SparkException(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(124).append("\n              |Error from python worker:\n              |  ").append(str.replace("\n", "\n  ")).append("\n              |PYTHONPATH was:\n              |  ").append(pythonPath()).append("\n              |").append(e2).toString())).stripMargin());
            sparkException.setStackTrace(e2.getStackTrace());
            throw sparkException;
        }
    }

    private void redirectStreamsToStderr(InputStream inputStream, InputStream inputStream2) {
        try {
            new RedirectThread(inputStream, System.err, new StringBuilder(18).append("stdout reader for ").append(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
            new RedirectThread(inputStream2, System.err, new StringBuilder(18).append("stderr reader for ").append(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
        } catch (Exception e) {
            logError(() -> {
                return "Exception in redirecting streams";
            }, e);
        }
    }

    public void org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers() {
        while (idleWorkers().nonEmpty()) {
            try {
                ((Socket) idleWorkers().dequeue()).close();
            } catch (Exception e) {
                logWarning(() -> {
                    return "Failed to close worker socket";
                }, e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void stopDaemon() {
        synchronized (this) {
            if (useDaemon()) {
                org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                if (daemon() != null) {
                    daemon().destroy();
                }
                daemon_$eq(null);
                daemonPort_$eq(0);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                simpleWorkers().mapValues(process -> {
                    process.destroy();
                    return BoxedUnit.UNIT;
                });
            }
        }
    }

    public void stop() {
        stopDaemon();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void stopWorker(Socket socket) {
        synchronized (this) {
            if (!useDaemon()) {
                simpleWorkers().get(socket).foreach(process -> {
                    process.destroy();
                    return BoxedUnit.UNIT;
                });
            } else if (daemon() != null) {
                daemonWorkers().get(socket).foreach(i -> {
                    DataOutputStream dataOutputStream = new DataOutputStream(this.daemon().getOutputStream());
                    dataOutputStream.writeInt(i);
                    dataOutputStream.flush();
                    this.daemon().getOutputStream().flush();
                });
            }
        }
        socket.close();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void releaseWorker(Socket socket) {
        if (useDaemon()) {
            synchronized (this) {
                org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs_$eq(System.nanoTime());
                idleWorkers().enqueue(Predef$.MODULE$.wrapRefArray(new Socket[]{socket}));
            }
        } else {
            try {
                socket.close();
            } catch (Exception e) {
                logWarning(() -> {
                    return "Failed to close worker socket";
                }, e);
            }
        }
    }

    private final Socket createSocket$1() {
        Socket socket = new Socket(daemonHost(), daemonPort());
        int readInt = new DataInputStream(socket.getInputStream()).readInt();
        if (readInt < 0) {
            throw new IllegalStateException(new StringBuilder(48).append("Python daemon failed to launch worker with code ").append(readInt).toString());
        }
        authHelper().authToServer(socket);
        daemonWorkers().put(socket, BoxesRunTime.boxToInteger(readInt));
        return socket;
    }

    private final Socket liftedTree1$1() {
        try {
            return createSocket$1();
        } catch (SocketException e) {
            logWarning(() -> {
                return "Failed to open socket to Python daemon:";
            }, e);
            logWarning(() -> {
                return "Assuming that daemon unexpectedly quit, attempting to restart";
            });
            stopDaemon();
            startDaemon();
            return createSocket$1();
        }
    }

    public PythonWorkerFactory(String str, Map<String, String> map) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec = str;
        this.envVars = map;
        org$apache$spark$internal$Logging$$log__$eq(null);
        this.useDaemon = !System.getProperty("os.name").startsWith("Windows") && BoxesRunTime.unboxToBoolean(SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_USE_DAEMON()));
        this.daemonModule = (String) ((Option) SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_DAEMON_MODULE())).map(str2 -> {
            this.logInfo(() -> {
                return new StringBuilder(186).append("Python daemon module in PySpark is set to [").append(str2).append("] in '").append(Python$.MODULE$.PYTHON_DAEMON_MODULE().key()).append("', ").append("using this to start the daemon up. Note that this configuration only has an effect when ").append("'").append(Python$.MODULE$.PYTHON_USE_DAEMON().key()).append("' is enabled and the platform is not Windows.").toString();
            });
            return str2;
        }).getOrElse(() -> {
            return "pyspark.daemon";
        });
        this.workerModule = (String) ((Option) SparkEnv$.MODULE$.get().conf().get(Python$.MODULE$.PYTHON_WORKER_MODULE())).map(str3 -> {
            this.logInfo(() -> {
                return new StringBuilder(182).append("Python worker module in PySpark is set to [").append(str3).append("] in '").append(Python$.MODULE$.PYTHON_WORKER_MODULE().key()).append("', ").append("using this to start the worker up. Note that this configuration only has an effect when ").append("'").append(Python$.MODULE$.PYTHON_USE_DAEMON().key()).append("' is disabled or the platform is Windows.").toString();
            });
            return str3;
        }).getOrElse(() -> {
            return "pyspark.worker";
        });
        this.authHelper = new SocketAuthHelper(SparkEnv$.MODULE$.get().conf());
        this.daemon = null;
        this.daemonHost = InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte()));
        this.daemonPort = 0;
        this.daemonWorkers = new WeakHashMap<>();
        this.idleWorkers = new Queue<>();
        this.org$apache$spark$api$python$PythonWorkerFactory$$lastActivityNs = 0L;
        new MonitorThread(this).start();
        this.simpleWorkers = new WeakHashMap<>();
        this.pythonPath = PythonUtils$.MODULE$.mergePythonPaths(Predef$.MODULE$.wrapRefArray(new String[]{PythonUtils$.MODULE$.sparkPythonPath(), (String) map.getOrElse("PYTHONPATH", () -> {
            return "";
        }), (String) package$.MODULE$.env().getOrElse("PYTHONPATH", () -> {
            return "";
        })}));
    }
}
