/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.Gridmix;
import org.apache.hadoop.mapred.gridmix.GridmixJob;
import org.apache.hadoop.mapred.gridmix.StatListener;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;

public class Statistics
implements Gridmix.Component<JobStats> {
    public static final Log LOG = LogFactory.getLog(Statistics.class);
    private final StatCollector statistics = new StatCollector();
    private JobClient cluster;
    private final List<StatListener<ClusterStats>> clusterStatlisteners = new CopyOnWriteArrayList<StatListener<ClusterStats>>();
    private final List<StatListener<JobStats>> jobStatListeners = new CopyOnWriteArrayList<StatListener<JobStats>>();
    private static final Map<Integer, JobStats> submittedJobsMap = new ConcurrentHashMap<Integer, JobStats>();
    private static volatile int numMapsSubmitted = 0;
    private static volatile int numReducesSubmitted = 0;
    private int completedJobsInCurrentInterval = 0;
    private final int jtPollingInterval;
    private volatile boolean shutdown = false;
    private final int maxJobCompletedInInterval;
    private static final String MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY = "gridmix.max-jobs-completed-in-poll-interval";
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition jobCompleted = this.lock.newCondition();
    private final CountDownLatch startFlag;

    public Statistics(final Configuration conf, int pollingInterval, CountDownLatch startFlag) throws IOException, InterruptedException {
        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
        this.cluster = (JobClient)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<JobClient>(){

            @Override
            public JobClient run() throws IOException {
                return new JobClient(new JobConf(conf));
            }
        });
        this.jtPollingInterval = pollingInterval;
        this.maxJobCompletedInInterval = conf.getInt(MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
        this.startFlag = startFlag;
    }

    public static JobStats generateJobStats(Job job, JobStory jobdesc) {
        int seq = GridmixJob.getJobSeqId((JobContext)job);
        if (seq >= 0 && jobdesc == null) {
            throw new IllegalArgumentException("JobStory not available for job " + job.getJobID());
        }
        int maps = -1;
        int reds = -1;
        if (jobdesc != null) {
            maps = jobdesc.getNumberMaps();
            reds = jobdesc.getNumberReduces();
        }
        return new JobStats(maps, reds, job);
    }

    private static void addToNumMapsSubmitted(int numMaps) {
        numMapsSubmitted += numMaps;
    }

    private static void addToNumReducesSubmitted(int numReduces) {
        numReducesSubmitted += numReduces;
    }

    private static void subtractFromNumMapsSubmitted(int numMaps) {
        numMapsSubmitted -= numMaps;
    }

    private static void subtractFromNumReducesSubmitted(int numReduces) {
        numReducesSubmitted -= numReduces;
    }

    public void addJobStats(JobStats stats) {
        int seq = GridmixJob.getJobSeqId((JobContext)stats.getJob());
        if (seq < 0) {
            LOG.info((Object)("Not tracking job " + stats.getJob().getJobName() + " as seq id is less than zero: " + seq));
            return;
        }
        submittedJobsMap.put(seq, stats);
        Statistics.addToNumMapsSubmitted(stats.getNoOfMaps());
        Statistics.addToNumReducesSubmitted(stats.getNoOfReds());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void add(JobStats job) {
        if (!this.statistics.isAlive()) {
            return;
        }
        JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId((JobContext)job.getJob()));
        if (stat == null) {
            LOG.error((Object)("[Statistics] Missing entry for job " + job.getJob().getJobID()));
            return;
        }
        Statistics.subtractFromNumMapsSubmitted(stat.getNoOfMaps());
        Statistics.subtractFromNumReducesSubmitted(stat.getNoOfReds());
        ++this.completedJobsInCurrentInterval;
        if (this.completedJobsInCurrentInterval >= this.maxJobCompletedInInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(" Reached maximum limit of jobs in a polling interval " + this.completedJobsInCurrentInterval));
            }
            this.completedJobsInCurrentInterval = 0;
            this.lock.lock();
            try {
                for (StatListener<JobStats> l : this.jobStatListeners) {
                    l.update(stat);
                }
                this.jobCompleted.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public void addClusterStatsObservers(StatListener<ClusterStats> listener) {
        this.clusterStatlisteners.add(listener);
    }

    public void addJobStatsListeners(StatListener<JobStats> listener) {
        this.jobStatListeners.add(listener);
    }

    @Override
    public void start() {
        this.statistics.start();
    }

    @Override
    public void join(long millis) throws InterruptedException {
        this.statistics.join(millis);
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
        submittedJobsMap.clear();
        this.clusterStatlisteners.clear();
        this.jobStatListeners.clear();
        this.statistics.interrupt();
    }

    @Override
    public void abort() {
        this.shutdown = true;
        submittedJobsMap.clear();
        this.clusterStatlisteners.clear();
        this.jobStatListeners.clear();
        this.statistics.interrupt();
    }

    static class ClusterStats {
        private ClusterStatus status = null;
        private static ClusterStats stats = new ClusterStats();

        private ClusterStats() {
        }

        static ClusterStats getClusterStats() {
            return stats;
        }

        void setClusterMetric(ClusterStatus metrics) {
            this.status = metrics;
        }

        public ClusterStatus getStatus() {
            return this.status;
        }

        int getNumRunningJob() {
            return submittedJobsMap.size();
        }

        static Collection<JobStats> getRunningJobStats() {
            return submittedJobsMap.values();
        }

        static int getSubmittedMapTasks() {
            return numMapsSubmitted;
        }

        static int getSubmittedReduceTasks() {
            return numReducesSubmitted;
        }
    }

    static class JobStats {
        private final int noOfMaps;
        private final int noOfReds;
        private JobStatus currentStatus;
        private final Job job;

        public JobStats(int noOfMaps, int numOfReds, Job job) {
            this.job = job;
            this.noOfMaps = noOfMaps;
            this.noOfReds = numOfReds;
        }

        public int getNoOfMaps() {
            return this.noOfMaps;
        }

        public int getNoOfReds() {
            return this.noOfReds;
        }

        public Job getJob() {
            return this.job;
        }

        public synchronized void updateJobStatus(JobStatus status) {
            this.currentStatus = status;
        }

        public synchronized JobStatus getJobStatus() {
            return this.currentStatus;
        }
    }

    private class StatCollector
    extends Thread {
        StatCollector() {
            super("StatsCollectorThread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Statistics.this.startFlag.await();
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
            }
            catch (InterruptedException ie) {
                LOG.error((Object)"Statistics Error while waiting for other threads to get ready ", (Throwable)ie);
                return;
            }
            while (!Statistics.this.shutdown) {
                Statistics.this.lock.lock();
                try {
                    Statistics.this.jobCompleted.await(Statistics.this.jtPollingInterval, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ie) {
                    if (!Statistics.this.shutdown) {
                        LOG.error((Object)"Statistics interrupt while waiting for completion of a job.", (Throwable)ie);
                    }
                    return;
                }
                finally {
                    Statistics.this.lock.unlock();
                }
                if (Statistics.this.clusterStatlisteners.size() <= 0) continue;
                try {
                    ClusterStatus clusterStatus = Statistics.this.cluster.getClusterStatus();
                    this.updateAndNotifyClusterStatsListeners(clusterStatus);
                }
                catch (IOException e) {
                    LOG.error((Object)"Statistics io exception while polling JT ", (Throwable)e);
                    return;
                }
            }
        }

        private void updateAndNotifyClusterStatsListeners(ClusterStatus clusterStatus) {
            ClusterStats stats = ClusterStats.getClusterStats();
            stats.setClusterMetric(clusterStatus);
            for (StatListener listener : Statistics.this.clusterStatlisteners) {
                listener.update(stats);
            }
        }
    }
}

