/*
 * Decompiled with CFR 0.152.
 */
package com.cloudera.livy.rsc.driver;

import com.cloudera.livy.JobContext;
import com.cloudera.livy.client.common.Serializer;
import com.cloudera.livy.rsc.BaseProtocol;
import com.cloudera.livy.rsc.BypassJobStatus;
import com.cloudera.livy.rsc.FutureListener;
import com.cloudera.livy.rsc.RSCConf;
import com.cloudera.livy.rsc.Utils;
import com.cloudera.livy.rsc.driver.BypassJob;
import com.cloudera.livy.rsc.driver.BypassJobWrapper;
import com.cloudera.livy.rsc.driver.JobContextImpl;
import com.cloudera.livy.rsc.driver.JobWrapper;
import com.cloudera.livy.rsc.driver.MutableClassLoader;
import com.cloudera.livy.rsc.rpc.Rpc;
import com.cloudera.livy.rsc.rpc.RpcDispatcher;
import com.cloudera.livy.rsc.rpc.RpcServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class RSCDriver
extends BaseProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(RSCDriver.class);
    private final Serializer serializer;
    private final Object jcLock;
    private final Object shutdownLock;
    private final ExecutorService executor;
    private final File localTmpDir;
    private final List<JobWrapper<?>> jobQueue;
    protected final Collection<Rpc> clients;
    final Map<String, JobWrapper<?>> activeJobs;
    private final Collection<BypassJobWrapper> bypassJobs;
    private RpcServer server;
    private volatile JobContextImpl jc;
    private volatile boolean running;
    protected final SparkConf conf;
    protected final RSCConf livyConf;
    private final AtomicReference<ScheduledFuture<?>> idleTimeout;

    public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwx------");
        this.localTmpDir = Files.createTempDirectory("rsc-tmp", PosixFilePermissions.asFileAttribute(perms)).toFile();
        this.executor = Executors.newCachedThreadPool();
        this.jobQueue = new LinkedList();
        this.clients = new ConcurrentLinkedDeque<Rpc>();
        this.serializer = new Serializer(new Class[0]);
        this.conf = conf;
        this.livyConf = livyConf;
        this.jcLock = new Object();
        this.shutdownLock = new Object();
        this.activeJobs = new ConcurrentHashMap();
        this.bypassJobs = new ConcurrentLinkedDeque<BypassJobWrapper>();
        this.idleTimeout = new AtomicReference();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void shutdown() {
        if (!this.running) {
            return;
        }
        this.running = false;
        for (JobWrapper<?> job : this.activeJobs.values()) {
            job.cancel();
        }
        try {
            this.shutdownContext();
        }
        catch (Exception e) {
            LOG.warn("Error during shutdown.", (Throwable)e);
        }
        try {
            this.shutdownServer();
        }
        catch (Exception e) {
            LOG.warn("Error during shutdown.", (Throwable)e);
        }
        Object object = this.shutdownLock;
        synchronized (object) {
            this.shutdownLock.notifyAll();
        }
        object = this.jcLock;
        synchronized (object) {
            this.jcLock.notifyAll();
        }
    }

    private void initializeServer() throws Exception {
        String clientId = this.livyConf.get(RSCConf.Entry.CLIENT_ID);
        Utils.checkArgument(clientId != null, "No client ID provided.", new Object[0]);
        String secret = this.livyConf.get(RSCConf.Entry.CLIENT_SECRET);
        Utils.checkArgument(secret != null, "No secret provided.", new Object[0]);
        String launcherAddress = this.livyConf.get(RSCConf.Entry.LAUNCHER_ADDRESS);
        Utils.checkArgument(launcherAddress != null, "Missing launcher address.", new Object[0]);
        int launcherPort = this.livyConf.getInt(RSCConf.Entry.LAUNCHER_PORT);
        Utils.checkArgument(launcherPort > 0, "Missing launcher port.", new Object[0]);
        LOG.info("Connecting to: {}:{}", (Object)launcherAddress, (Object)launcherPort);
        this.livyConf.set(RSCConf.Entry.RPC_SERVER_ADDRESS, null);
        LOG.info("Starting RPC server...");
        this.server = new RpcServer(this.livyConf);
        this.server.registerClient(clientId, secret, new RpcServer.ClientCallback(){

            @Override
            public RpcDispatcher onNewClient(Rpc client) {
                RSCDriver.this.registerClient(client);
                return RSCDriver.this;
            }
        });
        try (Rpc callbackRpc = (Rpc)Rpc.createClient(this.livyConf, this.server.getEventLoopGroup(), launcherAddress, launcherPort, clientId, secret, this).get();){
            callbackRpc.call(new BaseProtocol.RemoteDriverAddress(this.server.getAddress(), this.server.getPort())).get(this.livyConf.getTimeAsMs(RSCConf.Entry.RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
        }
        this.setupIdleTimeout();
    }

    private void registerClient(final Rpc client) {
        this.clients.add(client);
        this.stopIdleTimeout();
        Utils.addListener(client.getChannel().closeFuture(), new FutureListener<Void>(){

            @Override
            public void onSuccess(Void unused) {
                RSCDriver.this.clients.remove(client);
                RSCDriver.this.setupIdleTimeout();
            }
        });
        LOG.debug("Registered new connection from {}.", (Object)client.getChannel());
    }

    private void setupIdleTimeout() {
        if (this.clients.size() > 0) {
            return;
        }
        Runnable timeoutTask = new Runnable(){

            @Override
            public void run() {
                LOG.warn("Shutting down RSC due to idle timeout ({}).", (Object)RSCDriver.this.livyConf.get(RSCConf.Entry.SERVER_IDLE_TIMEOUT));
                RSCDriver.this.shutdown();
            }
        };
        ScheduledFuture timeout = this.server.getEventLoopGroup().schedule(timeoutTask, this.livyConf.getTimeAsMs(RSCConf.Entry.SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS);
        if (!this.idleTimeout.compareAndSet(null, timeout)) {
            LOG.debug("Timeout task already registered.");
            timeout.cancel(false);
        }
        if (this.clients.size() > 0) {
            this.stopIdleTimeout();
        }
    }

    private void stopIdleTimeout() {
        ScheduledFuture idleTimeout = this.idleTimeout.getAndSet(null);
        if (idleTimeout != null) {
            LOG.debug("Cancelling idle timeout since new client connected.");
            idleTimeout.cancel(false);
        }
    }

    protected JavaSparkContext initializeContext() throws Exception {
        long t1 = System.nanoTime();
        LOG.info("Starting Spark context...");
        JavaSparkContext sc = new JavaSparkContext(this.conf);
        LOG.info("Spark context finished initialization in {}ms", (Object)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1));
        return sc;
    }

    protected void shutdownContext() {
        if (this.jc != null) {
            this.jc.stop();
        }
        this.executor.shutdownNow();
        try {
            FileUtils.deleteDirectory((File)this.localTmpDir);
        }
        catch (IOException e) {
            LOG.warn("Failed to delete local tmp dir: " + this.localTmpDir, (Throwable)e);
        }
    }

    private void shutdownServer() {
        if (this.server != null) {
            this.server.close();
        }
        for (Rpc client : this.clients) {
            client.close();
        }
    }

    private void broadcast(Object msg) {
        for (Rpc client : this.clients) {
            try {
                client.call(msg);
            }
            catch (Exception e) {
                LOG.warn("Failed to send message to client " + client, (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void run() throws Exception {
        this.running = true;
        MutableClassLoader driverClassLoader = new MutableClassLoader(Thread.currentThread().getContextClassLoader());
        Thread.currentThread().setContextClassLoader(driverClassLoader);
        try {
            this.initializeServer();
            JavaSparkContext sc = this.initializeContext();
            Object object = this.jcLock;
            synchronized (object) {
                this.jc = new JobContextImpl(sc, this.localTmpDir, this);
                this.jcLock.notifyAll();
            }
            object = this.jcLock;
            synchronized (object) {
                for (JobWrapper<?> job : this.jobQueue) {
                    this.submit(job);
                }
                this.jobQueue.clear();
            }
            object = this.shutdownLock;
            synchronized (object) {
                try {
                    while (this.running) {
                        this.shutdownLock.wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
        finally {
            this.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submit(JobWrapper<?> job) {
        if (this.jc != null) {
            job.submit(this.executor);
            return;
        }
        Object object = this.jcLock;
        synchronized (object) {
            if (this.jc != null) {
                job.submit(this.executor);
            } else {
                LOG.info("SparkContext not yet up, queueing job request.");
                this.jobQueue.add(job);
            }
        }
    }

    JobContextImpl jobContext() {
        return this.jc;
    }

    Serializer serializer() {
        return this.serializer;
    }

    <T> void jobFinished(String jobId, T result, Throwable error) {
        LOG.debug("Send job({}) result to Client.", (Object)jobId);
        this.broadcast(new BaseProtocol.JobResult<T>(jobId, result, error));
    }

    void jobStarted(String jobId) {
        this.broadcast(new BaseProtocol.JobStarted(jobId));
    }

    public void handle(ChannelHandlerContext ctx, BaseProtocol.CancelJob msg) {
        JobWrapper<?> job = this.activeJobs.get(msg.id);
        if (job == null || !job.cancel()) {
            LOG.info("Requested to cancel an already finished job.");
        }
    }

    public void handle(ChannelHandlerContext ctx, BaseProtocol.EndSession msg) {
        if (this.livyConf.getBoolean(RSCConf.Entry.TEST_STUCK_END_SESSION)) {
            LOG.warn("Ignoring EndSession request because TEST_STUCK_END_SESSION is set.");
        } else {
            LOG.debug("Shutting down due to EndSession request.");
            this.shutdown();
        }
    }

    public void handle(ChannelHandlerContext ctx, BaseProtocol.JobRequest<?> msg) {
        LOG.info("Received job request {}", (Object)msg.id);
        JobWrapper wrapper = new JobWrapper(this, msg.id, msg.job);
        this.activeJobs.put(msg.id, wrapper);
        this.submit(wrapper);
    }

    public void handle(ChannelHandlerContext ctx, BaseProtocol.BypassJobRequest msg) throws Exception {
        LOG.info("Received bypass job request {}", (Object)msg.id);
        BypassJobWrapper wrapper = this.createWrapper(msg);
        this.bypassJobs.add(wrapper);
        this.activeJobs.put(msg.id, wrapper);
        if (msg.synchronous) {
            this.waitForJobContext();
            try {
                wrapper.call();
            }
            catch (Throwable throwable) {}
        } else {
            this.submit(wrapper);
        }
    }

    protected BypassJobWrapper createWrapper(BaseProtocol.BypassJobRequest msg) throws Exception {
        return new BypassJobWrapper(this, msg.id, new BypassJob(this.serializer(), msg.serializedJob));
    }

    public Object handle(ChannelHandlerContext ctx, BaseProtocol.SyncJobRequest msg) throws Exception {
        this.waitForJobContext();
        return msg.job.call((JobContext)this.jc);
    }

    public BypassJobStatus handle(ChannelHandlerContext ctx, BaseProtocol.GetBypassJobStatus msg) {
        Iterator<BypassJobWrapper> it = this.bypassJobs.iterator();
        while (it.hasNext()) {
            BypassJobWrapper job = it.next();
            if (!job.jobId.equals(msg.id)) continue;
            BypassJobStatus status = job.getStatus();
            switch (status.state) {
                case CANCELLED: 
                case FAILED: 
                case SUCCEEDED: {
                    it.remove();
                    break;
                }
            }
            return status;
        }
        throw new NoSuchElementException(msg.id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForJobContext() throws InterruptedException {
        Object object = this.jcLock;
        synchronized (object) {
            while (this.jc == null) {
                this.jcLock.wait();
                if (this.running) continue;
                throw new IllegalStateException("Remote context is shutting down.");
            }
        }
    }

    protected void addFile(String path) {
        this.jc.sc().addFile(path);
    }

    protected void addJarOrPyFile(String path) throws Exception {
        File localCopyDir = new File(this.jc.getLocalTmpDir(), "__livy__");
        File localCopy = this.copyFileToLocal(localCopyDir, path, this.jc.sc().sc());
        this.addLocalFileToClassLoader(localCopy);
        this.jc.sc().addJar(path);
    }

    public void addLocalFileToClassLoader(File localCopy) throws MalformedURLException {
        MutableClassLoader cl = (MutableClassLoader)Thread.currentThread().getContextClassLoader();
        cl.addURL(localCopy.toURI().toURL());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public File copyFileToLocal(File localCopyDir, String filePath, SparkContext sc) throws Exception {
        JobContextImpl jobContextImpl = this.jc;
        synchronized (jobContextImpl) {
            if (!localCopyDir.isDirectory() && !localCopyDir.mkdir()) {
                throw new IOException("Failed to create directory to add pyFile");
            }
        }
        URI uri = new URI(filePath);
        String name = uri.getFragment() != null ? uri.getFragment() : uri.getPath();
        name = new File(name).getName();
        File localCopy = new File(localCopyDir, name);
        if (localCopy.exists()) {
            throw new IOException(String.format("A file with name %s has already been uploaded.", name));
        }
        Configuration conf = sc.hadoopConfiguration();
        FileSystem fs = FileSystem.get((URI)uri, (Configuration)conf);
        fs.copyToLocalFile(new Path(uri), new Path(localCopy.toURI()));
        return localCopy;
    }
}

