/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkTask
extends Task<SparkWork> {
    private static final String CLASS_NAME = SparkTask.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
    private PerfLogger perfLogger;
    private static final long serialVersionUID = 1L;
    private transient int sparkJobID;
    private transient String sparkJobHandleId;
    private transient SparkStatistics sparkStatistics;
    private transient long submitTime;
    private transient long startTime;
    private transient long finishTime;
    private transient int succeededTaskCount;
    private transient int totalTaskCount;
    private transient int failedTaskCount;
    private transient List<Integer> stageIds;
    private transient SparkJobRef jobRef = null;
    private transient boolean isShutdown = false;
    private transient boolean jobKilled = false;

    @Override
    public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) {
        super.initialize(queryState, queryPlan, driverContext, opContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int execute(DriverContext driverContext) {
        int rc = 0;
        this.perfLogger = SessionState.getPerfLogger();
        SparkSession sparkSession = null;
        SparkSessionManagerImpl sparkSessionManager = null;
        try {
            this.printConfigInfo();
            sparkSessionManager = SparkSessionManagerImpl.getInstance();
            sparkSession = SparkUtilities.getSparkSession(this.conf, sparkSessionManager);
            SparkWork sparkWork = (SparkWork)this.getWork();
            sparkWork.setRequiredCounterPrefix(this.getOperatorCounters());
            this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkSubmitJob");
            this.submitTime = this.perfLogger.getStartTime("SparkSubmitJob");
            this.jobRef = sparkSession.submit(driverContext, sparkWork);
            this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkSubmitJob");
            if (driverContext.isShutdown()) {
                LOG.warn("Killing Spark job");
                this.killJob();
                throw new HiveException("Operation is cancelled.");
            }
            this.sparkJobHandleId = this.jobRef.getJobId();
            this.addToHistory(HiveHistory.Keys.SPARK_JOB_HANDLE_ID, this.jobRef.getJobId());
            LOG.debug("Starting Spark job with job handle id " + this.sparkJobHandleId);
            this.jobID = this.jobRef.getSparkJobStatus().getAppID();
            rc = this.jobRef.monitorJob();
            this.sparkJobID = this.jobRef.getSparkJobStatus().getJobId();
            this.addToHistory(HiveHistory.Keys.SPARK_JOB_ID, Integer.toString(this.sparkJobID));
            SparkJobStatus sparkJobStatus = this.jobRef.getSparkJobStatus();
            this.getSparkJobInfo(sparkJobStatus, rc);
            if (rc == 0) {
                this.sparkStatistics = sparkJobStatus.getSparkStatistics();
                this.printExcessiveGCWarning();
                if (LOG.isInfoEnabled() && this.sparkStatistics != null) {
                    LOG.info(SparkTask.sparkStatisticsToString(this.sparkStatistics, this.sparkJobID));
                }
                LOG.info("Successfully completed Spark job[" + this.sparkJobID + "] with application ID " + this.jobID + " and task ID " + this.getId());
            } else if (rc == 2) {
                LOG.debug("Failed to submit Spark job with job handle id " + this.sparkJobHandleId);
                LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(this.jobID) ? "UNKNOWN" : this.jobID));
                this.killJob();
            } else if (rc == 4) {
                LOG.info("The spark job or one stage of it has too many tasks. Cancelling Spark job " + this.sparkJobID + " with application ID " + this.jobID);
                this.killJob();
            }
            if (this.jobID == null) {
                this.jobID = sparkJobStatus.getAppID();
            }
            sparkJobStatus.cleanup();
        }
        catch (Exception e) {
            String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
            console.printError(msg, "\n" + StringUtils.stringifyException((Throwable)e));
            LOG.error(msg, (Throwable)e);
            this.setException(e);
            if (e instanceof HiveException) {
                HiveException he = (HiveException)e;
                rc = he.getCanonicalErrorMsg().getErrorCode();
            } else {
                rc = 1;
            }
        }
        finally {
            this.startTime = this.perfLogger.getEndTime("SparkSubmitToRunning");
            if (this.startTime < this.submitTime) {
                this.startTime = this.submitTime;
            }
            this.finishTime = this.perfLogger.getEndTime("SparkRunJob");
            Utilities.clearWork(this.conf);
            if (sparkSession != null && sparkSessionManager != null) {
                rc = this.close(rc);
                try {
                    sparkSessionManager.returnSession(sparkSession);
                }
                catch (HiveException ex) {
                    LOG.error("Failed to return the session to SessionManager", (Throwable)ex);
                }
            }
        }
        return rc;
    }

    private void printExcessiveGCWarning() {
        SparkStatisticGroup sparkStatisticGroup = this.sparkStatistics.getStatisticGroup("SPARK");
        if (sparkStatisticGroup != null) {
            double threshold;
            long taskDurationTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic("TaskDurationTime").getValue());
            long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic("JvmGCTime").getValue());
            if ((double)jvmGCTime > (double)taskDurationTime * (threshold = 0.1)) {
                long percentGcTime = Math.round((double)jvmGCTime / (double)taskDurationTime * 100.0);
                String gcWarning = String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of task time in GC", this.sparkJobID, percentGcTime, jvmGCTime, taskDurationTime);
                console.printInfo(gcWarning);
            }
        }
    }

    private void addToHistory(HiveHistory.Keys key, String value) {
        if (SessionState.get() != null) {
            SessionState.get().getHiveHistory().setQueryProperty(this.queryState.getQueryId(), key, value);
        }
    }

    @VisibleForTesting
    static String sparkStatisticsToString(SparkStatistics sparkStatistic, int sparkJobID) {
        StringBuilder sparkStatsString = new StringBuilder();
        sparkStatsString.append("\n\n");
        sparkStatsString.append(String.format("=====Spark Job[%d] Statistics=====", sparkJobID));
        sparkStatsString.append("\n\n");
        Iterator<SparkStatisticGroup> groupIterator = sparkStatistic.getStatisticGroups();
        while (groupIterator.hasNext()) {
            SparkStatisticGroup group = groupIterator.next();
            sparkStatsString.append(group.getGroupName()).append("\n");
            Iterator<SparkStatistic> statisticIterator = group.getStatistics();
            while (statisticIterator.hasNext()) {
                SparkStatistic statistic = statisticIterator.next();
                sparkStatsString.append("\t").append(statistic.getName()).append(": ").append(statistic.getValue()).append("\n");
            }
        }
        return sparkStatsString.toString();
    }

    private int close(int rc) {
        block4: {
            try {
                List<BaseWork> ws = ((SparkWork)this.work).getAllWork();
                for (BaseWork w : ws) {
                    for (Operator<?> op : w.getAllOperators()) {
                        op.jobClose(this.conf, rc == 0);
                    }
                }
            }
            catch (Exception e) {
                if (rc != 0) break block4;
                rc = 3;
                String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
                console.printError(mesg, "\n" + StringUtils.stringifyException((Throwable)e));
                this.setException(e);
            }
        }
        return rc;
    }

    @Override
    public void updateTaskMetrics(Metrics metrics) {
        metrics.incrementCounter("hive_spark_tasks");
    }

    @Override
    public boolean isMapRedTask() {
        return true;
    }

    @Override
    public StageType getType() {
        return StageType.MAPRED;
    }

    @Override
    public String getName() {
        return "SPARK";
    }

    @Override
    public Collection<MapWork> getMapWork() {
        ArrayList<MapWork> result = Lists.newArrayList();
        for (BaseWork w : ((SparkWork)this.getWork()).getRoots()) {
            result.add((MapWork)w);
        }
        return result;
    }

    @Override
    public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
        List<BaseWork> children = ((SparkWork)this.getWork()).getChildren(mapWork);
        if (children.size() != 1) {
            return null;
        }
        if (!(children.get(0) instanceof ReduceWork)) {
            return null;
        }
        return ((ReduceWork)children.get(0)).getReducer();
    }

    public int getSparkJobID() {
        return this.sparkJobID;
    }

    public SparkStatistics getSparkStatistics() {
        return this.sparkStatistics;
    }

    public int getSucceededTaskCount() {
        return this.succeededTaskCount;
    }

    public int getTotalTaskCount() {
        return this.totalTaskCount;
    }

    public int getFailedTaskCount() {
        return this.failedTaskCount;
    }

    public List<Integer> getStageIds() {
        return this.stageIds;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public long getSubmitTime() {
        return this.submitTime;
    }

    public long getFinishTime() {
        return this.finishTime;
    }

    public boolean isTaskShutdown() {
        return this.isShutdown;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.killJob();
        this.isShutdown = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void killJob() {
        LOG.debug("Killing Spark job with job handle id " + this.sparkJobHandleId);
        boolean needToKillJob = false;
        if (this.jobRef != null && !this.jobKilled) {
            SparkTask sparkTask = this;
            synchronized (sparkTask) {
                if (!this.jobKilled) {
                    this.jobKilled = true;
                    needToKillJob = true;
                }
            }
        }
        if (needToKillJob) {
            try {
                this.jobRef.cancelJob();
            }
            catch (Exception e) {
                LOG.warn("Failed to kill Spark job", (Throwable)e);
            }
        }
    }

    private void printConfigInfo() throws IOException {
        console.printInfo("In order to change the average load for a reducer (in bytes):");
        console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
        console.printInfo("In order to limit the maximum number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
        console.printInfo("In order to set a constant number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
    }

    /*
     * WARNING - void declaration
     */
    private Map<String, List<String>> getOperatorCounters() {
        String groupName = HiveConf.getVar(this.conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
        HashMap<String, List<String>> counters = new HashMap<String, List<String>>();
        LinkedList<String> hiveCounters = new LinkedList<String>();
        counters.put(groupName, hiveCounters);
        hiveCounters.add("CREATED_FILES");
        for (AbstractMapOperator.Counter counter : AbstractMapOperator.Counter.values()) {
            hiveCounters.add(counter.toString());
        }
        SparkWork sparkWork = (SparkWork)this.getWork();
        for (BaseWork work : sparkWork.getAllWork()) {
            for (Operator<?> operator : work.getAllOperators()) {
                if (operator instanceof FileSinkOperator) {
                    for (Enum enum_ : FileSinkOperator.Counter.values()) {
                        hiveCounters.add(((FileSinkOperator)operator).getCounterName((FileSinkOperator.Counter)enum_));
                    }
                    continue;
                }
                if (operator instanceof ReduceSinkOperator) {
                    void var12_17;
                    String contextName = this.conf.get("__hive.context.name", "");
                    Operator.Counter[] counterArray = Operator.Counter.values();
                    int n = counterArray.length;
                    boolean bl = false;
                    while (var12_17 < n) {
                        Operator.Counter counter2 = counterArray[var12_17];
                        hiveCounters.add(Utilities.getVertexCounterName(counter2.name(), contextName));
                        ++var12_17;
                    }
                    continue;
                }
                if (operator instanceof ScriptOperator) {
                    for (Enum enum_ : ScriptOperator.Counter.values()) {
                        hiveCounters.add(enum_.toString());
                    }
                    continue;
                }
                if (!(operator instanceof JoinOperator)) continue;
                for (Enum enum_ : JoinOperator.SkewkeyTableCounter.values()) {
                    hiveCounters.add(enum_.toString());
                }
            }
        }
        return counters;
    }

    private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) {
        try {
            Throwable error;
            this.stageIds = new ArrayList<Integer>();
            int[] ids = sparkJobStatus.getStageIds();
            if (ids != null) {
                for (int stageId : ids) {
                    this.stageIds.add(stageId);
                }
            }
            Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
            int sumTotal = 0;
            int sumComplete = 0;
            int sumFailed = 0;
            for (String s : progressMap.keySet()) {
                SparkStageProgress progress = progressMap.get(s);
                int complete = progress.getSucceededTaskCount();
                int total = progress.getTotalTaskCount();
                int failed = progress.getFailedTaskCount();
                sumTotal += total;
                sumComplete += complete;
                sumFailed += failed;
            }
            this.succeededTaskCount = sumComplete;
            this.totalTaskCount = sumTotal;
            this.failedTaskCount = sumFailed;
            if (rc != 0 && (error = sparkJobStatus.getError()) != null) {
                if (error instanceof InterruptedException || error instanceof HiveException && error.getCause() instanceof InterruptedException) {
                    LOG.info("Killing Spark job since query was interrupted");
                    this.killJob();
                }
                HiveException he = this.isOOMError(error) ? new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM) : new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
                this.setException(he);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to get Spark job information", (Throwable)e);
        }
    }

    private boolean isOOMError(Throwable error) {
        while (error != null) {
            if (error instanceof OutOfMemoryError) {
                return true;
            }
            if (error instanceof SparkException) {
                String sts = Throwables.getStackTraceAsString(error);
                return sts.contains("Container killed by YARN for exceeding memory limits");
            }
            error = error.getCause();
        }
        return false;
    }
}

