package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;

/* loaded from: input_file:WEB-INF/lib/hive-exec-1.2.0-mapr-1611.jar:org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.class */
public class RemoteHiveSparkClient implements HiveSparkClient {
    private static final long serialVersionUID = 1;
    private static final String MR_JAR_PROPERTY = "tmpjars";
    protected static final transient Log LOG = LogFactory.getLog(RemoteHiveSparkClient.class);
    private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
    private transient SparkClient remoteClient;
    private transient SparkConf sparkConf;
    private transient HiveConf hiveConf;
    private transient List<URI> localJars = new ArrayList();
    private transient List<URI> localFiles = new ArrayList();
    private final transient long sparkClientTimtout;

    /* loaded from: input_file:WEB-INF/lib/hive-exec-1.2.0-mapr-1611.jar:org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient$JobStatusJob.class */
    private static class JobStatusJob implements Job<Serializable> {
        private final byte[] jobConfBytes;
        private final byte[] scratchDirBytes;
        private final byte[] sparkWorkBytes;

        private JobStatusJob() {
            this(null, null, null);
        }

        JobStatusJob(byte[] bArr, byte[] bArr2, byte[] bArr3) {
            this.jobConfBytes = bArr;
            this.scratchDirBytes = bArr2;
            this.sparkWorkBytes = bArr3;
        }

        @Override // org.apache.hive.spark.client.Job
        public Serializable call(JobContext jobContext) throws Exception {
            JobConf deserializeJobConf = KryoSerializer.deserializeJobConf(this.jobConfBytes);
            List<String> addedJars = jobContext.getAddedJars();
            if (addedJars != null && !addedJars.isEmpty()) {
                SparkClientUtilities.addToClassPath((String[]) addedJars.toArray(new String[addedJars.size()]), deserializeJobConf, jobContext.getLocalTmpDir());
                deserializeJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
            }
            Path path = (Path) KryoSerializer.deserialize(this.scratchDirBytes, Path.class);
            SparkWork sparkWork = (SparkWork) KryoSerializer.deserialize(this.sparkWorkBytes, SparkWork.class);
            logConfigurations(deserializeJobConf);
            SparkCounters sparkCounters = new SparkCounters(jobContext.sc());
            Map<String, List<String>> requiredCounterPrefix = sparkWork.getRequiredCounterPrefix();
            if (requiredCounterPrefix != null) {
                for (String str : requiredCounterPrefix.keySet()) {
                    Iterator<String> it = requiredCounterPrefix.get(str).iterator();
                    while (it.hasNext()) {
                        sparkCounters.createCounter(str, it.next());
                    }
                }
            }
            SparkPlan generate = new SparkPlanGenerator(jobContext.sc(), null, deserializeJobConf, path, new SparkReporter(sparkCounters)).generate(sparkWork);
            jobContext.monitor(generate.generateGraph().foreachAsync(HiveVoidFunction.getInstance()), sparkCounters, generate.getCachedRDDIds());
            return null;
        }

        private void logConfigurations(JobConf jobConf) {
            if (RemoteHiveSparkClient.LOG.isInfoEnabled()) {
                RemoteHiveSparkClient.LOG.info("Logging job configuration: ");
                StringWriter stringWriter = new StringWriter();
                try {
                    Configuration.dumpConfiguration(jobConf, stringWriter);
                } catch (IOException e) {
                    RemoteHiveSparkClient.LOG.warn("Error logging job configuration", e);
                }
                RemoteHiveSparkClient.LOG.info(stringWriter.toString());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> map) throws IOException, SparkException {
        this.hiveConf = hiveConf;
        this.sparkConf = HiveSparkClientFactory.generateSparkConf(map);
        this.remoteClient = SparkClientFactory.createClient(map, hiveConf);
        this.sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public int getExecutorCount() throws Exception {
        return this.remoteClient.getExecutorCount().get(this.sparkClientTimtout, TimeUnit.SECONDS).intValue();
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public int getDefaultParallelism() throws Exception {
        return this.remoteClient.getDefaultParallelism().get(this.sparkClientTimtout, TimeUnit.SECONDS).intValue();
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient
    public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
        Context ctx = driverContext.getCtx();
        HiveConf hiveConf = (HiveConf) ctx.getConf();
        refreshLocalResources(sparkWork, hiveConf);
        JobConf jobConf = new JobConf(hiveConf);
        Path mRTmpPath = ctx.getMRTmpPath();
        mRTmpPath.getFileSystem(jobConf).mkdirs(mRTmpPath);
        JobHandle submit = this.remoteClient.submit(new JobStatusJob(KryoSerializer.serializeJobConf(jobConf), KryoSerializer.serialize(mRTmpPath), KryoSerializer.serialize(sparkWork)));
        return new RemoteSparkJobRef(hiveConf, submit, new RemoteSparkJobStatus(this.remoteClient, submit, this.sparkClientTimtout));
    }

    private void refreshLocalResources(SparkWork sparkWork, HiveConf hiveConf) throws IOException {
        addJars(new JobConf(getClass()).getJar());
        addJars(HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVEAUXJARS));
        String resourceFiles = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.JAR);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDJARS, resourceFiles);
        addJars(resourceFiles);
        JobConf jobConf = new JobConf(hiveConf);
        jobConf.set(MR_JAR_PROPERTY, "");
        Iterator<BaseWork> it = sparkWork.getAllWork().iterator();
        while (it.hasNext()) {
            it.next().configureJobConf(jobConf);
        }
        addJars(hiveConf.get(MR_JAR_PROPERTY));
        String resourceFiles2 = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.FILE);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDFILES, resourceFiles2);
        addResources(resourceFiles2);
        String resourceFiles3 = Utilities.getResourceFiles(hiveConf, SessionState.ResourceType.ARCHIVE);
        HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVEADDEDARCHIVES, resourceFiles3);
        addResources(resourceFiles3);
    }

    private void addResources(String str) throws IOException {
        for (String str2 : CSV_SPLITTER.split(Strings.nullToEmpty(str))) {
            try {
                URI uri = SparkUtilities.getURI(str2);
                if (uri != null && !this.localFiles.contains(uri)) {
                    if (SparkUtilities.needUploadToHDFS(uri, this.sparkConf)) {
                        uri = SparkUtilities.uploadToHDFS(uri, this.hiveConf);
                    }
                    this.localFiles.add(uri);
                    this.remoteClient.addFile(uri);
                }
            } catch (URISyntaxException e) {
                LOG.warn("Failed to add file:" + str2, e);
            }
        }
    }

    private void addJars(String str) throws IOException {
        for (String str2 : CSV_SPLITTER.split(Strings.nullToEmpty(str))) {
            try {
                URI uri = SparkUtilities.getURI(str2);
                if (uri != null && !this.localJars.contains(uri)) {
                    if (SparkUtilities.needUploadToHDFS(uri, this.sparkConf)) {
                        uri = SparkUtilities.uploadToHDFS(uri, this.hiveConf);
                    }
                    this.localJars.add(uri);
                    this.remoteClient.addJar(uri);
                }
            } catch (URISyntaxException e) {
                LOG.warn("Failed to add jar:" + str2, e);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.remoteClient.stop();
    }
}
