package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.apache.oozie.util.HCatURI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/hive-exec-2.1.1-mapr-1710-core.jar:org/apache/hadoop/hive/ql/exec/spark/SparkTask.class */
public class SparkTask extends Task<SparkWork> {
    private static final String CLASS_NAME = SparkTask.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
    private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
    private final PerfLogger perfLogger = SessionState.getPerfLogger();
    private static final long serialVersionUID = 1;

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext compilationOpContext) {
        super.initialize(queryState, queryPlan, driverContext, compilationOpContext);
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public int execute(DriverContext driverContext) {
        int i = 0;
        SparkSession sparkSession = null;
        SparkSessionManagerImpl sparkSessionManagerImpl = null;
        try {
            try {
                printConfigInfo();
                sparkSessionManagerImpl = SparkSessionManagerImpl.getInstance();
                sparkSession = SparkUtilities.getSparkSession(this.conf, sparkSessionManagerImpl);
                SparkWork work = getWork();
                work.setRequiredCounterPrefix(getOperatorCounters());
                this.perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
                SparkJobRef submit = sparkSession.submit(driverContext, work);
                this.perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
                addToHistory(submit);
                i = submit.monitorJob();
                SparkJobStatus sparkJobStatus = submit.getSparkJobStatus();
                if (i == 0) {
                    SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
                    if (LOG.isInfoEnabled() && sparkStatistics != null) {
                        LOG.info(String.format("=====Spark Job[%s] statistics=====", submit.getJobId()));
                        logSparkStatistic(sparkStatistics);
                    }
                    LOG.info("Execution completed successfully");
                } else if (i == 2) {
                    submit.cancelJob();
                }
                sparkJobStatus.cleanup();
                Utilities.clearWork(this.conf);
                if (sparkSession != null && sparkSessionManagerImpl != null) {
                    i = close(i);
                    try {
                        sparkSessionManagerImpl.returnSession(sparkSession);
                    } catch (HiveException e) {
                        LOG.error("Failed to return the session to SessionManager", (Throwable) e);
                    }
                }
            } catch (Exception e2) {
                String str = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e2) + HCatURI.PARTITION_VALUE_QUOTE;
                console.printError(str, "\n" + StringUtils.stringifyException(e2));
                LOG.error(str, (Throwable) e2);
                i = 1;
                Utilities.clearWork(this.conf);
                if (sparkSession != null && sparkSessionManagerImpl != null) {
                    i = close(1);
                    try {
                        sparkSessionManagerImpl.returnSession(sparkSession);
                    } catch (HiveException e3) {
                        LOG.error("Failed to return the session to SessionManager", (Throwable) e3);
                    }
                }
            }
            return i;
        } catch (Throwable th) {
            Utilities.clearWork(this.conf);
            if (sparkSession != null && sparkSessionManagerImpl != null) {
                close(i);
                try {
                    sparkSessionManagerImpl.returnSession(sparkSession);
                } catch (HiveException e4) {
                    LOG.error("Failed to return the session to SessionManager", (Throwable) e4);
                }
            }
            throw th;
        }
    }

    private void addToHistory(SparkJobRef sparkJobRef) {
        console.printInfo("Starting Spark Job = " + sparkJobRef.getJobId());
        if (SessionState.get() != null) {
            SessionState.get().getHiveHistory().setQueryProperty(this.queryState.getQueryId(), HiveHistory.Keys.SPARK_JOB_ID, sparkJobRef.getJobId());
        }
    }

    private void logSparkStatistic(SparkStatistics sparkStatistics) {
        Iterator<SparkStatisticGroup> statisticGroups = sparkStatistics.getStatisticGroups();
        while (statisticGroups.hasNext()) {
            SparkStatisticGroup next = statisticGroups.next();
            LOG.info(next.getGroupName());
            Iterator<SparkStatistic> statistics = next.getStatistics();
            while (statistics.hasNext()) {
                SparkStatistic next2 = statistics.next();
                LOG.info("\t" + next2.getName() + ": " + next2.getValue());
            }
        }
    }

    private int close(int i) {
        try {
            Iterator<BaseWork> it = ((SparkWork) this.work).getAllWork().iterator();
            while (it.hasNext()) {
                Iterator<Operator<?>> it2 = it.next().getAllOperators().iterator();
                while (it2.hasNext()) {
                    it2.next().jobClose(this.conf, i == 0);
                }
            }
        } catch (Exception e) {
            if (i == 0) {
                i = 3;
                console.printError("Job Commit failed with exception '" + Utilities.getNameMessage(e) + HCatURI.PARTITION_VALUE_QUOTE, "\n" + StringUtils.stringifyException(e));
            }
        }
        return i;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public boolean isMapRedTask() {
        return true;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public StageType getType() {
        return StageType.MAPRED;
    }

    @Override // org.apache.hadoop.hive.ql.lib.Node
    public String getName() {
        return "SPARK";
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public Collection<MapWork> getMapWork() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<BaseWork> it = getWork().getRoots().iterator();
        while (it.hasNext()) {
            newArrayList.add((MapWork) it.next());
        }
        return newArrayList;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
        List<BaseWork> children = getWork().getChildren(mapWork);
        if (children.size() == 1 && (children.get(0) instanceof ReduceWork)) {
            return ((ReduceWork) children.get(0)).getReducer();
        }
        return null;
    }

    private void printConfigInfo() throws IOException {
        console.printInfo("In order to change the average load for a reducer (in bytes):");
        console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
        console.printInfo("In order to limit the maximum number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
        console.printInfo("In order to set a constant number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
    }

    private Map<String, List<String>> getOperatorCounters() {
        String var = HiveConf.getVar(this.conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
        HashMap hashMap = new HashMap();
        LinkedList linkedList = new LinkedList();
        hashMap.put(var, linkedList);
        linkedList.add(Operator.HIVECOUNTERCREATEDFILES);
        for (AbstractMapOperator.Counter counter : AbstractMapOperator.Counter.values()) {
            linkedList.add(counter.toString());
        }
        Iterator<BaseWork> it = getWork().getAllWork().iterator();
        while (it.hasNext()) {
            for (Operator<?> operator : it.next().getAllOperators()) {
                if (operator instanceof FileSinkOperator) {
                    for (FileSinkOperator.Counter counter2 : FileSinkOperator.Counter.values()) {
                        linkedList.add(((FileSinkOperator) operator).getCounterName(counter2));
                    }
                } else if (operator instanceof ReduceSinkOperator) {
                    for (ReduceSinkOperator.Counter counter3 : ReduceSinkOperator.Counter.values()) {
                        linkedList.add(((ReduceSinkOperator) operator).getCounterName(counter3, this.conf));
                    }
                } else if (operator instanceof ScriptOperator) {
                    for (ScriptOperator.Counter counter4 : ScriptOperator.Counter.values()) {
                        linkedList.add(counter4.toString());
                    }
                } else if (operator instanceof JoinOperator) {
                    for (JoinOperator.SkewkeyTableCounter skewkeyTableCounter : JoinOperator.SkewkeyTableCounter.values()) {
                        linkedList.add(skewkeyTableCounter.toString());
                    }
                }
            }
        }
        return hashMap;
    }
}
