/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveSparkClientFactory {
    protected static final transient Logger LOG = LoggerFactory.getLogger(HiveSparkClientFactory.class);
    private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
    private static final String SPARK_DEFAULT_MASTER = "yarn";
    private static final String SPARK_DEFAULT_DEPLOY_MODE = "cluster";
    private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
    private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
    private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
    private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
    private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
    @VisibleForTesting
    public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf";

    public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception {
        Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveconf, sessionId);
        String master = sparkConf.get("spark.master");
        if (master.equals("local") || master.startsWith("local[")) {
            return LocalHiveSparkClient.getInstance(HiveSparkClientFactory.generateSparkConf(sparkConf), hiveconf);
        }
        return new RemoteHiveSparkClient(hiveconf, sparkConf, sessionId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Map<String, String> initiateSparkConf(HiveConf hiveConf, String sessionId) {
        String queueName;
        HashMap<String, String> sparkConf = new HashMap<String, String>();
        HBaseConfiguration.addHbaseResources((Configuration)hiveConf);
        sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
        String appNameKey = "spark.app.name";
        Object appName = hiveConf.get("spark.app.name");
        String sessionIdString = " (sessionId = " + sessionId + ")";
        appName = appName == null ? (sessionId == null ? SPARK_DEFAULT_APP_NAME : SPARK_DEFAULT_APP_NAME + sessionIdString) : (String)appName + sessionIdString;
        sparkConf.put("spark.app.name", (String)appName);
        sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
        sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
        InputStream inputStream = null;
        try {
            inputStream = HiveSparkClientFactory.class.getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
            if (inputStream != null) {
                LOG.info("loading spark properties from: spark-defaults.conf");
                Properties properties = new Properties();
                properties.load(new InputStreamReader(inputStream, "UTF-8"));
                for (String string : properties.stringPropertyNames()) {
                    if (!string.startsWith("spark")) continue;
                    String value = properties.getProperty(string);
                    sparkConf.put(string, properties.getProperty(string));
                    LOG.debug(String.format("load spark property from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, string, LogUtils.maskIfPassword((String)string, (String)value)));
                }
            }
        }
        catch (IOException e) {
            LOG.info("Failed to open spark configuration file: spark-defaults.conf", (Throwable)e);
        }
        finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                }
                catch (IOException e) {
                    LOG.debug("Failed to close inputstream.", (Throwable)e);
                }
            }
        }
        String sparkMaster = hiveConf.get("spark.master");
        if (sparkMaster == null) {
            sparkMaster = (String)sparkConf.get("spark.master");
            hiveConf.set("spark.master", sparkMaster);
        }
        String deployMode = null;
        if (!SparkClientUtilities.isLocalMaster((String)sparkMaster) && (deployMode = hiveConf.get(SPARK_DEPLOY_MODE)) == null) {
            deployMode = (String)sparkConf.get(SPARK_DEPLOY_MODE);
            if (deployMode == null) {
                deployMode = SparkClientUtilities.getDeployModeFromMaster((String)sparkMaster);
            }
            if (deployMode == null) {
                deployMode = SPARK_DEFAULT_DEPLOY_MODE;
            }
            hiveConf.set(SPARK_DEPLOY_MODE, deployMode);
        }
        if (SessionState.get() != null && SessionState.get().getConf() != null) {
            SessionState.get().getConf().set("spark.master", sparkMaster);
            if (deployMode != null) {
                SessionState.get().getConf().set(SPARK_DEPLOY_MODE, deployMode);
            }
        }
        if (SparkClientUtilities.isYarnClusterMode((String)sparkMaster, (String)deployMode)) {
            sparkConf.put("spark.yarn.maxAppAttempts", "1");
        }
        for (Map.Entry entry : hiveConf) {
            String value;
            String propertyName = (String)entry.getKey();
            if (propertyName.startsWith("spark")) {
                value = hiveConf.get(propertyName);
                sparkConf.put(propertyName, value);
                LOG.debug(String.format("load spark property from hive configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword((String)propertyName, (String)value)));
            } else if (propertyName.startsWith(SPARK_DEFAULT_MASTER) && SparkClientUtilities.isYarnMaster((String)sparkMaster)) {
                value = hiveConf.get(propertyName);
                sparkConf.put("spark.hadoop." + propertyName, value);
                LOG.debug(String.format("load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName, LogUtils.maskIfPassword((String)propertyName, (String)value)));
            } else if (propertyName.equals("fs.defaultFS")) {
                value = hiveConf.get(propertyName);
                if (value != null && !value.isEmpty()) {
                    sparkConf.put("spark.hadoop." + propertyName, value);
                }
            } else if (propertyName.startsWith("hbase") || propertyName.startsWith("zookeeper.znode")) {
                value = hiveConf.get(propertyName);
                sparkConf.put("spark.hadoop." + propertyName, value);
                LOG.debug(String.format("load HBase configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword((String)propertyName, (String)value)));
            } else if (propertyName.startsWith("oozie")) {
                value = hiveConf.get(propertyName);
                sparkConf.put("spark." + propertyName, value);
                LOG.debug(String.format("Pass Oozie configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword((String)propertyName, (String)value)));
            }
            if (!RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains((Object)propertyName)) continue;
            value = RpcConfiguration.getValue((HiveConf)hiveConf, (String)propertyName);
            sparkConf.put(propertyName, value);
            LOG.debug(String.format("load RPC property from hive configuration (%s -> %s).", propertyName, LogUtils.maskIfPassword((String)propertyName, (String)value)));
        }
        boolean bl = hiveConf.getBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE);
        HashSet classes = Sets.newHashSet((Iterable)Splitter.on((String)",").trimResults().omitEmptyStrings().split((CharSequence)Strings.nullToEmpty((String)((String)sparkConf.get("spark.kryo.classesToRegister")))));
        classes.add(Writable.class.getName());
        classes.add(VectorizedRowBatch.class.getName());
        if (!bl) {
            classes.add(HiveKey.class.getName());
            classes.add(BytesWritable.class.getName());
        } else {
            sparkConf.put("spark.kryo.registrator", "org.apache.hive.spark.HiveKryoRegistrator");
        }
        sparkConf.put("spark.kryo.classesToRegister", Joiner.on((String)",").join((Iterable)classes));
        String sparkQueueNameKey = "spark.yarn.queue";
        if (SparkClientUtilities.isYarnMaster((String)sparkMaster) && hiveConf.get("spark.yarn.queue") == null && (queueName = hiveConf.get("mapreduce.job.queuename")) != null) {
            sparkConf.put("spark.yarn.queue", queueName);
        }
        if (SparkClientUtilities.isYarnClusterMode((String)sparkMaster, (String)deployMode) && sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
            sparkConf.put(SPARK_WAIT_APP_COMPLETE, SPARK_DEFAULT_REFERENCE_TRACKING);
        }
        sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true");
        String password = HiveConfUtil.getJobCredentialProviderPassword((Configuration)hiveConf);
        if (password != null) {
            HiveSparkClientFactory.addCredentialProviderPassword(sparkConf, password);
        }
        return sparkConf;
    }

    private static void addCredentialProviderPassword(Map<String, String> sparkConf, String jobCredstorePassword) {
        sparkConf.put("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword);
        sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword);
    }

    static SparkConf generateSparkConf(Map<String, String> conf) {
        SparkConf sparkConf = new SparkConf(false);
        for (Map.Entry<String, String> entry : conf.entrySet()) {
            sparkConf.set(entry.getKey(), entry.getValue());
        }
        return sparkConf;
    }
}

