/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.piggybank.storage;

import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Stack;
import javax.xml.parsers.SAXParserFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.DefaultJobHistoryParser;
import org.apache.hadoop.mapred.JobHistory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

public class HadoopJobHistoryLoader
extends LoadFunc {
    private static final Log LOG = LogFactory.getLog(HadoopJobHistoryLoader.class);
    private RecordReader<Text, MRJobInfo> reader;
    public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
    public static final String MAP_INPUT_RECORDS = "MAP_INPUT_RECORDS";
    public static final String REDUCE_INPUT_RECORDS = "REDUCE_INPUT_RECORDS";
    private static final Map<String, String> XML_KEYS = new HashMap<String, String>();

    public InputFormat getInputFormat() throws IOException {
        return new HadoopJobHistoryInputFormat();
    }

    public Tuple getNext() throws IOException {
        boolean notDone = false;
        try {
            notDone = this.reader.nextKeyValue();
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        if (!notDone) {
            return null;
        }
        Tuple t = null;
        try {
            MRJobInfo val = (MRJobInfo)this.reader.getCurrentValue();
            t = DefaultTupleFactory.getInstance().newTuple(3);
            t.set(0, val.job);
            t.set(1, val.mapTask);
            t.set(2, val.reduceTask);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        return t;
    }

    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
        this.reader = (HadoopJobHistoryReader)reader;
    }

    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths((Job)job, (String)location);
        FileInputFormat.setInputPathFilter((Job)job, JobHistoryPathFilter.class);
    }

    public static void parseJobHistory(Configuration jobConf, JobHistory.JobInfo jobInfo, MRJobInfo value) {
        value.job.clear();
        HadoopJobHistoryLoader.populateJob(jobInfo.getValues(), value.job);
        value.mapTask.clear();
        value.reduceTask.clear();
        HadoopJobHistoryLoader.populateMapReduceTaskLists(value, jobInfo.getAllTasks());
    }

    private static void populateJob(Map<JobHistory.Keys, String> jobC, Map<String, String> job) {
        int size = jobC.size();
        Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
        block18: for (int i = 0; i < size; ++i) {
            Map.Entry<JobHistory.Keys, String> entry = kv.next();
            JobHistory.Keys key = entry.getKey();
            String value = entry.getValue();
            switch (key) {
                case JOBTRACKERID: {
                    job.put(JobKeys.JOBTRACKERID.toString(), value);
                    continue block18;
                }
                case FINISH_TIME: {
                    job.put(JobKeys.FINISH_TIME.toString(), value);
                    continue block18;
                }
                case JOBID: {
                    job.put(JobKeys.JOBID.toString(), value);
                    continue block18;
                }
                case JOBNAME: {
                    job.put(JobKeys.JOBNAME.toString(), value);
                    continue block18;
                }
                case USER: {
                    job.put(JobKeys.USER.toString(), value);
                    continue block18;
                }
                case JOBCONF: {
                    job.put(JobKeys.JOBCONF.toString(), value);
                    continue block18;
                }
                case SUBMIT_TIME: {
                    job.put(JobKeys.SUBMIT_TIME.toString(), value);
                    continue block18;
                }
                case LAUNCH_TIME: {
                    job.put(JobKeys.LAUNCH_TIME.toString(), value);
                    continue block18;
                }
                case TOTAL_MAPS: {
                    job.put(JobKeys.TOTAL_MAPS.toString(), value);
                    continue block18;
                }
                case TOTAL_REDUCES: {
                    job.put(JobKeys.TOTAL_REDUCES.toString(), value);
                    continue block18;
                }
                case FAILED_MAPS: {
                    job.put(JobKeys.FAILED_MAPS.toString(), value);
                    continue block18;
                }
                case FAILED_REDUCES: {
                    job.put(JobKeys.FAILED_REDUCES.toString(), value);
                    continue block18;
                }
                case FINISHED_MAPS: {
                    job.put(JobKeys.FINISHED_MAPS.toString(), value);
                    continue block18;
                }
                case FINISHED_REDUCES: {
                    job.put(JobKeys.FINISHED_REDUCES.toString(), value);
                    continue block18;
                }
                case JOB_STATUS: {
                    job.put(JobKeys.STATUS.toString(), value);
                    continue block18;
                }
                case COUNTERS: {
                    value.concat(",");
                    HadoopJobHistoryLoader.parseAndAddJobCounters(job, value);
                    continue block18;
                }
                default: {
                    LOG.debug((Object)("JobHistory.Keys." + key + " : NOT INCLUDED IN LOADER RETURN VALUE"));
                }
            }
        }
    }

    private static void parseAndAddJobCounters(Map<String, String> job, String counters) {
        try {
            Counters counterGroups = Counters.fromEscapedCompactString((String)counters);
            for (Counters.Group otherGroup : counterGroups) {
                Counters.Group group = counterGroups.getGroup(otherGroup.getName());
                for (Counters.Counter otherCounter : otherGroup) {
                    Counters.Counter counter = group.getCounterForName(otherCounter.getName());
                    job.put(otherCounter.getName(), String.valueOf(counter.getValue()));
                }
            }
        }
        catch (ParseException e) {
            LOG.warn((Object)"Failed to parse job counters", (Throwable)e);
        }
    }

    private static void populateMapReduceTaskLists(MRJobInfo value, Map<String, JobHistory.Task> taskMap) {
        Map<String, String> mapT = value.mapTask;
        Map<String, String> reduceT = value.reduceTask;
        long minMapRows = Long.MAX_VALUE;
        long maxMapRows = 0L;
        long minMapTime = Long.MAX_VALUE;
        long maxMapTime = 0L;
        long avgMapTime = 0L;
        long totalMapTime = 0L;
        int numberMaps = 0;
        long minReduceRows = Long.MAX_VALUE;
        long maxReduceRows = 0L;
        long minReduceTime = Long.MAX_VALUE;
        long maxReduceTime = 0L;
        long avgReduceTime = 0L;
        long totalReduceTime = 0L;
        int numberReduces = 0;
        int num_tasks = taskMap.entrySet().size();
        Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator();
        for (int i = 0; i < num_tasks; ++i) {
            long rows;
            Counters counters;
            String val;
            JobHistory.Keys key;
            int j;
            Iterator kv;
            int size;
            long endTime;
            long startTime;
            long duration;
            Map<JobHistory.Keys, String> successTaskAttemptMap;
            Map.Entry<String, JobHistory.Task> entry = ti.next();
            JobHistory.Task task = entry.getValue();
            if (task.get(JobHistory.Keys.TASK_TYPE).equals("MAP")) {
                Map mapTask = task.getValues();
                successTaskAttemptMap = HadoopJobHistoryLoader.getLastSuccessfulTaskAttempt(task);
                if (successTaskAttemptMap != null) {
                    mapTask.putAll(successTaskAttemptMap);
                } else {
                    LOG.info((Object)("Task:<" + task.get(JobHistory.Keys.TASKID) + "> is not successful - SKIPPING"));
                }
                duration = 0L;
                startTime = 0L;
                endTime = 0L;
                size = mapTask.size();
                ++numberMaps;
                kv = mapTask.entrySet().iterator();
                block15: for (j = 0; j < size; ++j) {
                    Map.Entry mtc = kv.next();
                    key = (JobHistory.Keys)mtc.getKey();
                    val = (String)mtc.getValue();
                    switch (key) {
                        case START_TIME: {
                            startTime = Long.valueOf(val);
                            continue block15;
                        }
                        case FINISH_TIME: {
                            endTime = Long.valueOf(val);
                            continue block15;
                        }
                        case COUNTERS: {
                            try {
                                counters = Counters.fromEscapedCompactString((String)val);
                                rows = counters.getGroup(TASK_COUNTER_GROUP).getCounterForName(MAP_INPUT_RECORDS).getCounter();
                                if (rows < minMapRows) {
                                    minMapRows = rows;
                                }
                                if (rows <= maxMapRows) continue block15;
                                maxMapRows = rows;
                            }
                            catch (ParseException e) {
                                LOG.warn((Object)"Failed to parse job counters", (Throwable)e);
                            }
                            continue block15;
                        }
                        default: {
                            LOG.warn((Object)("JobHistory.Keys." + key + " : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS"));
                        }
                    }
                }
                duration = endTime - startTime;
                if (minMapTime > duration) {
                    minMapTime = duration;
                }
                if (maxMapTime < duration) {
                    maxMapTime = duration;
                }
                totalMapTime += duration;
                continue;
            }
            if (task.get(JobHistory.Keys.TASK_TYPE).equals("REDUCE")) {
                Map reduceTask = task.getValues();
                successTaskAttemptMap = HadoopJobHistoryLoader.getLastSuccessfulTaskAttempt(task);
                if (successTaskAttemptMap != null) {
                    reduceTask.putAll(successTaskAttemptMap);
                } else {
                    LOG.warn((Object)("Task:<" + task.get(JobHistory.Keys.TASKID) + "> is not successful - SKIPPING"));
                }
                duration = 0L;
                startTime = 0L;
                endTime = 0L;
                size = reduceTask.size();
                ++numberReduces;
                kv = reduceTask.entrySet().iterator();
                block16: for (j = 0; j < size; ++j) {
                    Map.Entry rtc = kv.next();
                    key = (JobHistory.Keys)rtc.getKey();
                    val = (String)rtc.getValue();
                    switch (key) {
                        case START_TIME: {
                            startTime = Long.valueOf(val);
                            continue block16;
                        }
                        case FINISH_TIME: {
                            endTime = Long.valueOf(val);
                            continue block16;
                        }
                        case COUNTERS: {
                            try {
                                counters = Counters.fromEscapedCompactString((String)val);
                                rows = counters.getGroup(TASK_COUNTER_GROUP).getCounterForName(REDUCE_INPUT_RECORDS).getCounter();
                                if (rows < minReduceRows) {
                                    minReduceRows = rows;
                                }
                                if (rows <= maxReduceRows) continue block16;
                                maxReduceRows = rows;
                            }
                            catch (ParseException e) {
                                LOG.warn((Object)"Failed to parse job counters", (Throwable)e);
                            }
                            continue block16;
                        }
                        default: {
                            LOG.warn((Object)("JobHistory.Keys." + key + " : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS"));
                        }
                    }
                }
                duration = endTime - startTime;
                if (minReduceTime > duration) {
                    minReduceTime = duration;
                }
                if (maxReduceTime < duration) {
                    maxReduceTime = duration;
                }
                totalReduceTime += duration;
                continue;
            }
            if (task.get(JobHistory.Keys.TASK_TYPE).equals("CLEANUP")) {
                LOG.info((Object)("IGNORING TASK TYPE : " + task.get(JobHistory.Keys.TASK_TYPE)));
                continue;
            }
            LOG.warn((Object)("UNKNOWN TASK TYPE : " + task.get(JobHistory.Keys.TASK_TYPE)));
        }
        if (numberMaps > 0) {
            avgMapTime = totalMapTime / (long)numberMaps;
            mapT.put("MIN_MAP_TIME", String.valueOf(minMapTime));
            mapT.put("MAX_MAP_TIME", String.valueOf(maxMapTime));
            mapT.put("MIN_MAP_INPUT_ROWS", String.valueOf(minMapRows));
            mapT.put("MAX_MAP_INPUT_ROWS", String.valueOf(maxMapRows));
            mapT.put("AVG_MAP_TIME", String.valueOf(avgMapTime));
            mapT.put("NUMBER_MAPS", String.valueOf(numberMaps));
        }
        if (numberReduces > 0) {
            avgReduceTime = totalReduceTime / (long)numberReduces;
            reduceT.put("MIN_REDUCE_TIME", String.valueOf(minReduceTime));
            reduceT.put("MAX_REDUCE_TIME", String.valueOf(maxReduceTime));
            reduceT.put("AVG_REDUCE_TIME", String.valueOf(avgReduceTime));
            reduceT.put("MIN_REDUCE_INPUT_ROWS", String.valueOf(minReduceTime));
            reduceT.put("MAX_REDUCE_INPUT_ROWS", String.valueOf(maxReduceTime));
            reduceT.put("NUMBER_REDUCES", String.valueOf(numberReduces));
        } else {
            reduceT.put("NUMBER_REDUCES", String.valueOf(0));
        }
    }

    private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
        Map taskAttempts = task.getTaskAttempts();
        int size = taskAttempts.size();
        Iterator kv = taskAttempts.entrySet().iterator();
        for (int i = 0; i < size; ++i) {
            Map.Entry tae = kv.next();
            JobHistory.TaskAttempt attempt = (JobHistory.TaskAttempt)tae.getValue();
            if (null == attempt || null == attempt.getValues() || !attempt.getValues().containsKey(JobHistory.Keys.TASK_STATUS) || !((String)attempt.getValues().get(JobHistory.Keys.TASK_STATUS)).equals("SUCCESS")) continue;
            return attempt.getValues();
        }
        return null;
    }

    public static Map<String, String> parseJobXML(InputStream in) {
        HashMap<String, String> xmlMap = new HashMap<String, String>();
        try {
            SAXParserFactory.newInstance().newSAXParser().parse(in, (DefaultHandler)new JobXMLHandler(xmlMap));
        }
        catch (Exception e) {
            LOG.warn((Object)"Failed to parser job xml", (Throwable)e);
        }
        return xmlMap;
    }

    static {
        XML_KEYS.put("mapred.job.queue.name", "QUEUE_NAME");
        XML_KEYS.put("group.name", "USER_GROUP");
        XML_KEYS.put("user.name", "USER");
        XML_KEYS.put("user.dir", "HOST_DIR");
        XML_KEYS.put("cluster", "CLUSTER");
        XML_KEYS.put("jobName", "JOB_NAME");
        XML_KEYS.put("pig.script.id", "PIG_SCRIPT_ID");
        XML_KEYS.put("pig.script", "PIG_SCRIPT");
        XML_KEYS.put("pig.hadoop.version", "HADOOP_VERSION");
        XML_KEYS.put("pig.version", "PIG_VERSION");
        XML_KEYS.put("pig.job.feature", "PIG_JOB_FEATURE");
        XML_KEYS.put("pig.alias", "PIG_JOB_ALIAS");
        XML_KEYS.put("pig.parent.jobid", "PIG_JOB_PARENTS");
        XML_KEYS.put("pig.host", "HOST_NAME");
    }

    private static class JobXMLHandler
    extends DefaultHandler {
        private static final String NAME = "name";
        private static final String VALUE = "value";
        private static Stack<String> tags = new Stack();
        private static String curTag;
        private static String key;
        private static Map<String, String> xmlMap;

        public JobXMLHandler(Map<String, String> xml) {
            xmlMap = xml;
        }

        @Override
        public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
            tags.add(qName);
            curTag = qName;
        }

        @Override
        public void endElement(String uri, String localName, String qName) throws SAXException {
            String tag = tags.pop();
            if (tag == null || !tag.equalsIgnoreCase(qName)) {
                throw new SAXException("Malformatted XML file: " + tag + " : " + qName);
            }
            curTag = null;
        }

        @Override
        public void characters(char[] ch, int start, int length) throws SAXException {
            if (tags.size() > 1) {
                String displayKey;
                String s = new String(ch, start, length);
                if (curTag.equalsIgnoreCase(NAME)) {
                    key = s;
                }
                if (curTag.equalsIgnoreCase(VALUE) && (displayKey = (String)XML_KEYS.get(key)) != null) {
                    xmlMap.put(displayKey, s);
                }
            }
        }
    }

    public static enum JobKeys {
        JOBTRACKERID,
        JOBID,
        JOBNAME,
        JOBTYPE,
        USER,
        SUBMIT_TIME,
        CONF_PATH,
        LAUNCH_TIME,
        TOTAL_MAPS,
        TOTAL_REDUCES,
        STATUS,
        FINISH_TIME,
        FINISHED_MAPS,
        FINISHED_REDUCES,
        FAILED_MAPS,
        FAILED_REDUCES,
        LAUNCHED_MAPS,
        LAUNCHED_REDUCES,
        RACKLOCAL_MAPS,
        DATALOCAL_MAPS,
        HDFS_BYTES_READ,
        HDFS_BYTES_WRITTEN,
        FILE_BYTES_READ,
        FILE_BYTES_WRITTEN,
        COMBINE_OUTPUT_RECORDS,
        COMBINE_INPUT_RECORDS,
        REDUCE_INPUT_GROUPS,
        REDUCE_INPUT_RECORDS,
        REDUCE_OUTPUT_RECORDS,
        MAP_INPUT_RECORDS,
        MAP_OUTPUT_RECORDS,
        MAP_INPUT_BYTES,
        MAP_OUTPUT_BYTES,
        MAP_HDFS_BYTES_WRITTEN,
        JOBCONF,
        JOB_PRIORITY,
        SHUFFLE_BYTES,
        SPILLED_RECORDS;

    }

    public static class MRJobInfo {
        public Map<String, String> job = new HashMap<String, String>();
        public Map<String, String> mapTask = new HashMap<String, String>();
        public Map<String, String> reduceTask = new HashMap<String, String>();
    }

    public static class HadoopJobHistoryReader
    extends RecordReader<Text, MRJobInfo> {
        private String location;
        private MRJobInfo value;
        private Configuration conf;

        public void close() throws IOException {
        }

        public Text getCurrentKey() throws IOException, InterruptedException {
            return null;
        }

        public MRJobInfo getCurrentValue() throws IOException, InterruptedException {
            return this.value;
        }

        public float getProgress() throws IOException, InterruptedException {
            return 0.0f;
        }

        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            FileSplit fSplit = (FileSplit)split;
            Path p = fSplit.getPath();
            this.location = p.toString();
            LOG.info((Object)("location: " + this.location));
            this.conf = context.getConfiguration();
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (this.location != null) {
                LOG.info((Object)("load: " + this.location));
                Path full = new Path(this.location);
                String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName((String)full.getName()).split("_");
                String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
                JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
                this.value = new MRJobInfo();
                FileSystem fs = full.getFileSystem(this.conf);
                FileStatus fstat = fs.getFileStatus(full);
                LOG.info((Object)("file size: " + fstat.getLen()));
                DefaultJobHistoryParser.parseJobTasks((String)this.location, (JobHistory.JobInfo)job, (FileSystem)full.getFileSystem(this.conf));
                LOG.info((Object)"job history parsed sucessfully");
                HadoopJobHistoryLoader.parseJobHistory(this.conf, job, this.value);
                LOG.info((Object)"get parsed job history");
                Path parent = full.getParent();
                String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
                Path p = new Path(parent, jobXml);
                FSDataInputStream fileIn = fs.open(p);
                Map<String, String> val = HadoopJobHistoryLoader.parseJobXML((InputStream)fileIn);
                for (String key : val.keySet()) {
                    this.value.job.put(key, val.get(key));
                }
                this.location = null;
                return true;
            }
            this.value = null;
            return false;
        }
    }

    public static class HadoopJobHistoryInputFormat
    extends FileInputFormat<Text, MRJobInfo> {
        public RecordReader<Text, MRJobInfo> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new HadoopJobHistoryReader();
        }

        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    }

    public static class JobHistoryPathFilter
    implements PathFilter {
        public boolean accept(Path p) {
            String name = p.getName();
            return !name.endsWith(".xml");
        }
    }
}

