package org.apache.pig.backend.hadoop.executionengine.spark;

import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkCounterGroup;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.class */
public class SparkLauncher extends Launcher {
    private String jobGroupID;
    private PigContext pigContext = null;
    private JobConf jobConf = null;
    private String currentDirectoryPath = null;
    private SparkEngineConf sparkEngineConf = new SparkEngineConf();
    private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
    private static JavaSparkContext sparkContext = null;
    private static JobMetricsListener jobMetricsListener = new JobMetricsListener();
    private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName();

    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher$ResourceType.class */
    public enum ResourceType {
        JAR,
        FILE
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.Launcher
    public PigStats launchPig(PhysicalPlan physicalPlan, String str, PigContext pigContext) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(physicalPlan);
        }
        this.pigContext = pigContext;
        initialize(physicalPlan);
        SparkOperPlan compile = compile(physicalPlan, pigContext);
        if (LOG.isDebugEnabled()) {
            LOG.debug(compile);
        }
        SparkPigStats sparkPigStats = (SparkPigStats) pigContext.getExecutionEngine().instantiatePigStats();
        sparkPigStats.initialize(pigContext, compile, this.jobConf);
        PigStats.start(sparkPigStats);
        startSparkIfNeeded(pigContext);
        this.jobGroupID = String.format("%s-%s", sparkContext.getConf().getAppId(), UUID.randomUUID().toString());
        this.jobConf.set(MRConfiguration.JOB_ID, this.jobGroupID);
        sparkContext.setJobGroup(this.jobGroupID, "Pig query to Spark cluster", false);
        jobMetricsListener.reset();
        this.currentDirectoryPath = Paths.get(ScriptEngine.NAMESPACE_SEPARATOR, new String[0]).toAbsolutePath().normalize().toString() + "/";
        new ParallelismSetter(compile, this.jobConf).visit();
        prepareSparkCounters(this.jobConf);
        HashMap hashMap = new HashMap();
        hashMap.put(POLoad.class, new LoadConverter(pigContext, physicalPlan, sparkContext.sc(), this.jobConf, this.sparkEngineConf));
        hashMap.put(POStore.class, new StoreConverter(this.jobConf));
        hashMap.put(POForEach.class, new ForEachConverter(this.jobConf));
        hashMap.put(POFilter.class, new FilterConverter());
        hashMap.put(POPackage.class, new PackageConverter());
        hashMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
        hashMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
        hashMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter());
        hashMap.put(POLimit.class, new LimitConverter());
        hashMap.put(PODistinct.class, new DistinctConverter());
        hashMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
        hashMap.put(POSort.class, new SortConverter());
        hashMap.put(POSplit.class, new SplitConverter());
        hashMap.put(POSkewedJoin.class, new SkewedJoinConverter());
        hashMap.put(POMergeJoin.class, new MergeJoinConverter());
        hashMap.put(POCollectedGroup.class, new CollectedGroupConverter());
        hashMap.put(POCounter.class, new CounterConverter());
        hashMap.put(PORank.class, new RankConverter());
        hashMap.put(POStream.class, new StreamConverter());
        hashMap.put(POFRJoinSpark.class, new FRJoinConverter());
        hashMap.put(POMergeCogroup.class, new MergeCogroupConverter());
        hashMap.put(POReduceBySpark.class, new ReduceByConverter());
        hashMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
        hashMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext));
        hashMap.put(POSampleSortSpark.class, new SparkSampleSortConverter());
        hashMap.put(POPoissonSampleSpark.class, new PoissonSampleConverter());
        if (ConfigurationUtil.toConfiguration(pigContext.getProperties()).getBoolean(PigConfiguration.PIG_PRINT_EXEC_PLAN, false)) {
            LOG.info(compile);
        }
        uploadResources(compile);
        new JobGraphBuilder(compile, hashMap, sparkPigStats, sparkContext, jobMetricsListener, this.jobGroupID, this.jobConf, pigContext).visit();
        cleanUpSparkJob(sparkPigStats);
        sparkPigStats.finish();
        resetUDFContext();
        return sparkPigStats;
    }

    private void resetUDFContext() {
        UDFContext.getUDFContext().addJobConf(null);
    }

    private void uploadResources(SparkOperPlan sparkOperPlan) throws IOException {
        addFilesToSparkJob(sparkOperPlan);
        addJarsToSparkJob(sparkOperPlan);
    }

    private void optimize(SparkOperPlan sparkOperPlan, PigContext pigContext) throws IOException {
        Configuration configuration = ConfigurationUtil.toConfiguration(pigContext.getProperties());
        boolean z = configuration.getBoolean("pig.exec.nocombiner", false);
        if (!pigContext.inIllustrator && !z) {
            new CombinerOptimizer(sparkOperPlan).visit();
            if (LOG.isDebugEnabled()) {
                LOG.debug("After combiner optimization:");
                LOG.debug(sparkOperPlan);
            }
        }
        boolean z2 = configuration.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false);
        if (!pigContext.inIllustrator && !z2) {
            new SecondaryKeyOptimizerSpark(sparkOperPlan).visit();
        }
        if (configuration.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true)) {
            new AccumulatorOptimizer(sparkOperPlan).visit();
        }
        new NoopFilterRemover(sparkOperPlan).visit();
        boolean z3 = configuration.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Before multiquery optimization:");
            LOG.debug(sparkOperPlan);
        }
        if (z3) {
            new MultiQueryOptimizerSpark(sparkOperPlan).visit();
        }
        new JoinGroupOptimizerSpark(sparkOperPlan).visit();
        if (LOG.isDebugEnabled()) {
            LOG.debug("After multiquery optimization:");
            LOG.debug(sparkOperPlan);
        }
    }

    private void cleanUpSparkJob(SparkPigStats sparkPigStats) throws ExecException {
        LOG.info("Clean up Spark Job");
        if (System.getenv("SPARK_MASTER") != null ? System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true) {
            String property = this.pigContext.getProperties().getProperty("pig.streaming.ship.files");
            if (property != null) {
                for (String str : property.split(",")) {
                    File file = new File(this.currentDirectoryPath + "/" + new File(str).getName());
                    if (file.exists()) {
                        LOG.info(String.format("Delete ship file result: %b", Boolean.valueOf(file.delete())));
                    }
                }
            }
            String property2 = this.pigContext.getProperties().getProperty("pig.streaming.cache.files");
            if (property2 != null) {
                for (String str2 : property2.split(",")) {
                    File file2 = new File(this.currentDirectoryPath + "/" + extractFileName(str2.trim()));
                    if (file2.exists()) {
                        LOG.info(String.format("Delete cache file result: %b", Boolean.valueOf(file2.delete())));
                    }
                }
            }
        }
        for (OutputStats outputStats : sparkPigStats.getOutputStats()) {
            POStore pOStore = outputStats.getPOStore();
            try {
                if (outputStats.isSuccessful()) {
                    pOStore.getStoreFunc().cleanupOnSuccess(pOStore.getSFile().getFileName(), Job.getInstance(outputStats.getConf()));
                } else {
                    pOStore.getStoreFunc().cleanupOnFailure(pOStore.getSFile().getFileName(), Job.getInstance(outputStats.getConf()));
                }
            } catch (IOException e) {
                throw new ExecException(e);
            } catch (AbstractMethodError e2) {
            }
        }
    }

    private void addFilesToSparkJob(SparkOperPlan sparkOperPlan) throws IOException {
        LOG.info("Add files Spark Job");
        shipFiles(this.pigContext.getProperties().getProperty("pig.streaming.ship.files"));
        cacheFiles(this.pigContext.getProperties().getProperty("pig.streaming.cache.files"));
        addUdfResourcesToSparkJob(sparkOperPlan);
    }

    private void addUdfResourcesToSparkJob(SparkOperPlan sparkOperPlan) throws IOException {
        SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new SparkPOUserFuncVisitor(sparkOperPlan);
        sparkPOUserFuncVisitor.visit();
        Joiner on = Joiner.on(",");
        shipFiles(on.join(sparkPOUserFuncVisitor.getShipFiles()));
        cacheFiles(on.join(sparkPOUserFuncVisitor.getCacheFiles()));
    }

    private void shipFiles(String str) throws IOException {
        if (str == null || str.isEmpty()) {
            return;
        }
        for (String str2 : str.split(",")) {
            File file = new File(str2.trim());
            if (file.exists()) {
                addResourceToSparkJobWorkingDirectory(file, file.getName(), file.getName().endsWith(".jar") ? ResourceType.JAR : ResourceType.FILE);
            }
        }
    }

    private void cacheFiles(String str) throws IOException {
        String extractFileUrl;
        if (str == null || str.isEmpty()) {
            return;
        }
        File file = Files.createTempDirectory("cache", new FileAttribute[0]).toFile();
        file.deleteOnExit();
        for (String str2 : str.split(",")) {
            String extractFileName = extractFileName(str2.trim());
            if (extractFileName != null && (extractFileUrl = extractFileUrl(str2.trim())) != null) {
                Path path = new Path(extractFileUrl);
                File file2 = new File(file, extractFileName);
                Path path2 = new Path(file2.getAbsolutePath());
                path2.getFileSystem(this.jobConf).copyToLocalFile(path, path2);
                file2.deleteOnExit();
                LOG.info(String.format("CacheFile:%s", extractFileName));
                addResourceToSparkJobWorkingDirectory(file2, extractFileName, ResourceType.FILE);
            }
        }
    }

    private void addJarsToSparkJob(SparkOperPlan sparkOperPlan) throws IOException {
        HashSet hashSet = new HashSet();
        LOG.info("Add default jars to Spark Job");
        hashSet.addAll(JarManager.getDefaultJars());
        LOG.info("Add script jars to Spark Job");
        Iterator<String> it = this.pigContext.scriptJars.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next());
        }
        LOG.info("Add udf jars to Spark Job");
        UDFJarsFinder uDFJarsFinder = new UDFJarsFinder(sparkOperPlan, this.pigContext);
        uDFJarsFinder.visit();
        Iterator<String> it2 = uDFJarsFinder.getUdfJars().iterator();
        while (it2.hasNext()) {
            hashSet.add(it2.next());
        }
        File createPigScriptUDFJar = JarManager.createPigScriptUDFJar(this.pigContext);
        if (createPigScriptUDFJar != null) {
            LOG.info("Add script udf jar to Spark job");
            hashSet.add(createPigScriptUDFJar.getAbsolutePath().toString());
        }
        LOG.info("Add extra jars to Spark job");
        Iterator<URL> it3 = this.pigContext.extraJars.iterator();
        while (it3.hasNext()) {
            hashSet.add(it3.next().getFile());
        }
        Iterator it4 = hashSet.iterator();
        while (it4.hasNext()) {
            File file = new File((String) it4.next());
            addResourceToSparkJobWorkingDirectory(file, file.getName(), ResourceType.JAR);
        }
    }

    private void addResourceToSparkJobWorkingDirectory(File file, String str, ResourceType resourceType) throws IOException {
        if (resourceType == ResourceType.JAR) {
            LOG.info("Added jar " + str);
        } else {
            LOG.info("Added file " + str);
        }
        if (!(System.getenv("SPARK_MASTER") != null ? System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true)) {
            if (resourceType == ResourceType.JAR) {
                sparkContext.addJar(file.toURI().toURL().toExternalForm());
                return;
            } else {
                if (resourceType == ResourceType.FILE) {
                    sparkContext.addFile(file.toURI().toURL().toExternalForm());
                    return;
                }
                return;
            }
        }
        File file2 = new File(this.currentDirectoryPath + "/" + str);
        if (file.getAbsolutePath().equals(file2.getAbsolutePath()) && file.exists()) {
            return;
        }
        synchronized (SparkLauncher.class) {
            if (file2.exists()) {
                LOG.info(String.format("Jar file %s exists, ready to delete", file2.getAbsolutePath()));
                file2.delete();
            } else {
                LOG.info(String.format("Jar file %s not exists,", file2.getAbsolutePath()));
            }
            Files.copy(Paths.get(new Path(file.getAbsolutePath()).toString(), new String[0]), Paths.get(file2.getAbsolutePath(), new String[0]), new CopyOption[0]);
        }
    }

    private String extractFileName(String str) {
        String[] split = str.split("#");
        return (split == null || split.length != 2) ? null : split[1];
    }

    private String extractFileUrl(String str) {
        String[] split = str.split("#");
        return (split == null || split.length != 2) ? null : split[0];
    }

    public SparkOperPlan compile(PhysicalPlan physicalPlan, PigContext pigContext) throws PlanException, IOException, VisitorException {
        SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, pigContext);
        sparkCompiler.compile();
        sparkCompiler.connectSoftLink();
        SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
        new SparkPOPackageAnnotator(sparkPlan).visit();
        optimize(sparkPlan, pigContext);
        return sparkPlan;
    }

    private static synchronized void startSparkIfNeeded(PigContext pigContext) throws PigException {
        String str;
        if (sparkContext == null) {
            if (pigContext.getExecType().isLocal()) {
                str = HExecutionEngine.LOCAL;
            } else {
                str = System.getenv("SPARK_MASTER");
                if (str == null) {
                    LOG.info("SPARK_MASTER not specified, using \"local\"");
                    str = HExecutionEngine.LOCAL;
                }
            }
            String str2 = System.getenv("SPARK_HOME");
            if (!str.startsWith(HExecutionEngine.LOCAL) && !str.equals("yarn-client") && str2 == null) {
                System.err.println("You need to set SPARK_HOME to run on a Mesos cluster!");
                throw new PigException("SPARK_HOME is not set");
            }
            SparkConf sparkConf = new SparkConf();
            Properties properties = pigContext.getProperties();
            sparkConf.setMaster(str);
            sparkConf.setAppName(properties.getProperty(PigContext.JOB_NAME, "pig"));
            sparkConf.set("spark.rpc.useNettyFileServer", properties.getProperty(PigConfiguration.PIG_SPARK_USE_NETTY_FILESERVER, "false"));
            if (str2 == null || str2.isEmpty()) {
                LOG.warn("SPARK_HOME is not set");
            } else {
                sparkConf.setSparkHome(str2);
            }
            for (String str3 : properties.stringPropertyNames()) {
                if (str3.startsWith("spark.")) {
                    LOG.debug("Copying key " + str3 + " with value " + properties.getProperty(str3) + " to SparkConf");
                    sparkConf.set(str3, properties.getProperty(str3));
                }
            }
            sparkConf.set("spark.executor.userClassPathFirst", PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT);
            checkAndConfigureDynamicAllocation(str, sparkConf);
            sparkContext = new JavaSparkContext(sparkConf);
            sparkContext.sc().addSparkListener(new StatsReportListener());
            sparkContext.sc().addSparkListener(new JobLogger());
            sparkContext.sc().addSparkListener(jobMetricsListener);
        }
    }

    private static void checkAndConfigureDynamicAllocation(String str, SparkConf sparkConf) {
        if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
            if (!str.startsWith("yarn")) {
                LOG.warn("Dynamic allocation is enabled, but script isn't running on yarn. Ignoring ...");
            }
            if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
                return;
            }
            LOG.info("Spark shuffle service is being enabled as dynamic allocation is enabled");
            sparkConf.set("spark.shuffle.service.enabled", PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT);
        }
    }

    static void stopSpark() {
        if (sparkContext != null) {
            sparkContext.stop();
            sparkContext = null;
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.Launcher
    public void explain(PhysicalPlan physicalPlan, PigContext pigContext, PrintStream printStream, String str, boolean z) throws IOException {
        explain(compile(physicalPlan, pigContext), printStream, str, z);
    }

    private void explain(SparkOperPlan sparkOperPlan, PrintStream printStream, String str, boolean z) throws IOException {
        ArrayList arrayList = new ArrayList(sparkOperPlan.getKeys().keySet());
        Collections.sort(arrayList);
        if (str.equals("text")) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SparkOperator operator = sparkOperPlan.getOperator((OperatorKey) it.next());
                printStream.print(operator.getOperatorKey());
                List<SparkOperator> successors = sparkOperPlan.getSuccessors(operator);
                if (successors != null) {
                    printStream.print("->");
                    Iterator<SparkOperator> it2 = successors.iterator();
                    while (it2.hasNext()) {
                        printStream.print(it2.next().getOperatorKey() + " ");
                    }
                }
                printStream.println();
            }
            SparkPrinter sparkPrinter = new SparkPrinter(printStream, sparkOperPlan);
            sparkPrinter.setVerbose(z);
            sparkPrinter.visit();
            return;
        }
        if (str.equals("dot")) {
            printStream.println("#--------------------------------------------------");
            printStream.println("# Spark Plan");
            printStream.println("#--------------------------------------------------");
            DotSparkPrinter dotSparkPrinter = new DotSparkPrinter(sparkOperPlan, printStream);
            dotSparkPrinter.setVerbose(z);
            dotSparkPrinter.dump();
            printStream.println("");
            return;
        }
        if (!str.equals("xml")) {
            throw new IOException("Unsupported explain format. Supported formats are: text, dot, xml");
        }
        try {
            XMLSparkPrinter xMLSparkPrinter = new XMLSparkPrinter(printStream, sparkOperPlan);
            xMLSparkPrinter.visit();
            xMLSparkPrinter.closePlan();
        } catch (ParserConfigurationException e) {
            e.printStackTrace();
        } catch (TransformerException e2) {
            e2.printStackTrace();
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.Launcher
    public void kill() throws BackendException {
        if (sparkContext != null) {
            sparkContext.stop();
            sparkContext = null;
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.Launcher
    public void killJob(String str, Configuration configuration) throws BackendException {
        if (sparkContext != null) {
            sparkContext.stop();
            sparkContext = null;
        }
    }

    private void saveUdfImportList() {
        this.sparkEngineConf.setSparkUdfImportListStr(Joiner.on(",").join(PigContext.getPackageImportList()));
    }

    private void initialize(PhysicalPlan physicalPlan) throws IOException {
        saveUdfImportList();
        this.jobConf = SparkUtil.newJobConf(this.pigContext, physicalPlan, this.sparkEngineConf);
        SchemaTupleBackend.initialize((Configuration) this.jobConf, this.pigContext);
        Utils.setDefaultTimeZone(this.jobConf);
        PigMapReduce.sJobConfInternal.set(this.jobConf);
        String property = this.pigContext.getProperties().getProperty("spark.default.parallelism");
        if (property != null) {
            SparkPigContext.get();
            SparkPigContext.setDefaultParallelism(Integer.parseInt(property));
        }
    }

    private static void prepareSparkCounters(JobConf jobConf) throws IOException {
        SparkPigStatusReporter sparkPigStatusReporter = SparkPigStatusReporter.getInstance();
        SparkCounters sparkCounters = new SparkCounters(sparkContext);
        if (PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT.equalsIgnoreCase(jobConf.get("aggregate.warning"))) {
            SparkCounterGroup.MapSparkCounterGroup mapSparkCounterGroup = new SparkCounterGroup.MapSparkCounterGroup(PIG_WARNING_FQCN, PIG_WARNING_FQCN, sparkContext);
            mapSparkCounterGroup.createCounter(PigWarning.SPARK_WARN.name(), (String) new HashMap());
            mapSparkCounterGroup.createCounter(PigWarning.SPARK_CUSTOM_WARN.name(), (String) new HashMap());
            sparkCounters.getSparkCounterGroups().put(PIG_WARNING_FQCN, mapSparkCounterGroup);
        }
        sparkPigStatusReporter.setCounters(sparkCounters);
        jobConf.set("pig.spark.counters", ObjectSerializer.serialize(sparkCounters));
    }
}
