/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.jobhistory;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

public class JobHistoryEventHandler
extends AbstractService
implements EventHandler<JobHistoryEvent> {
    private final AppContext context;
    private final int startCount;
    private int eventCounter;
    private FileSystem stagingDirFS;
    private FileSystem doneDirFS;
    private Path stagingDirPath = null;
    private Path doneDirPrefixPath = null;
    private int maxUnflushedCompletionEvents;
    private int postJobCompletionMultiplier;
    private long flushTimeout;
    private int minQueueSizeForBatchingFlushes;
    private int numUnflushedCompletionEvents = 0;
    private boolean isTimerActive;
    protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
    protected Thread eventHandlingThread;
    private volatile boolean stopped;
    private final Object lock = new Object();
    private static final Log LOG = LogFactory.getLog(JobHistoryEventHandler.class);
    protected static final Map<JobId, MetaInfo> fileMap = Collections.synchronizedMap(new HashMap());
    protected volatile boolean forceJobCompletion = false;

    public JobHistoryEventHandler(AppContext context, int startCount) {
        super("JobHistoryEventHandler");
        this.context = context;
        this.startCount = startCount;
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        String userDoneDirStr;
        block9: {
            String jobId = TypeConverter.fromYarn(this.context.getApplicationID()).toString();
            String stagingDirStr = null;
            String doneDirStr = null;
            userDoneDirStr = null;
            try {
                stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
                doneDirStr = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
                userDoneDirStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
            }
            catch (IOException e) {
                LOG.error("Failed while getting the configured log directories", e);
                throw new YarnRuntimeException(e);
            }
            try {
                this.stagingDirPath = FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
                this.stagingDirFS = FileSystem.get(this.stagingDirPath.toUri(), conf);
                this.mkdir(this.stagingDirFS, this.stagingDirPath, new FsPermission(JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
            }
            catch (IOException e) {
                LOG.error("Failed while checking for/creating  history staging path: [" + this.stagingDirPath + "]", e);
                throw new YarnRuntimeException(e);
            }
            Path doneDirPath = null;
            try {
                doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
                this.doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
                if (this.doneDirFS.exists(doneDirPath)) break block9;
                if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
                    LOG.info("Creating intermediate history logDir: [" + doneDirPath + "] + based on conf. Should ideally be created by the JobHistoryServer: " + "yarn.app.mapreduce.am.create-intermediate-jh-base-dir");
                    this.mkdir(this.doneDirFS, doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
                    break block9;
                }
                String message = "Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + "yarn.app.mapreduce.am.create-intermediate-jh-base-dir" + ". Either set to true or pre-create this directory with" + " appropriate permissions";
                LOG.error(message);
                throw new YarnRuntimeException(message);
            }
            catch (IOException e) {
                LOG.error("Failed checking for the existance of history intermediate done directory: [" + doneDirPath + "]");
                throw new YarnRuntimeException(e);
            }
        }
        try {
            this.doneDirPrefixPath = FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
            this.mkdir(this.doneDirFS, this.doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
        }
        catch (IOException e) {
            LOG.error("Error creating user intermediate history done directory: [ " + this.doneDirPrefixPath + "]", e);
            throw new YarnRuntimeException(e);
        }
        this.maxUnflushedCompletionEvents = conf.getInt("yarn.app.mapreduce.am.history.max-unflushed-events", 200);
        this.postJobCompletionMultiplier = conf.getInt("yarn.app.mapreduce.am.history.job-complete-unflushed-multiplier", 30);
        this.flushTimeout = conf.getLong("yarn.app.mapreduce.am.history.complete-event-flush-timeout", 30000L);
        this.minQueueSizeForBatchingFlushes = conf.getInt("yarn.app.mapreduce.am.history.use-batched-flush.queue-size.threshold", 50);
        super.serviceInit(conf);
    }

    private void mkdir(FileSystem fs, Path path, FsPermission fsp) throws IOException {
        if (!fs.exists(path)) {
            try {
                fs.mkdirs(path, fsp);
                FileStatus fsStatus = fs.getFileStatus(path);
                LOG.info("Perms after creating " + fsStatus.getPermission().toShort() + ", Expected: " + fsp.toShort());
                if (fsStatus.getPermission().toShort() != fsp.toShort()) {
                    LOG.info("Explicitly setting permissions to : " + fsp.toShort() + ", " + fsp);
                    fs.setPermission(path, fsp);
                }
            }
            catch (FileAlreadyExistsException e) {
                LOG.info("Directory: [" + path + "] already exists.");
            }
        }
    }

    @Override
    protected void serviceStart() throws Exception {
        this.eventHandlingThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                JobHistoryEvent event = null;
                while (!JobHistoryEventHandler.this.stopped && !Thread.currentThread().isInterrupted()) {
                    if (JobHistoryEventHandler.this.eventCounter != 0 && JobHistoryEventHandler.this.eventCounter % 1000 == 0) {
                        JobHistoryEventHandler.this.eventCounter = 0;
                        LOG.info("Size of the JobHistory event queue is " + JobHistoryEventHandler.this.eventQueue.size());
                    } else {
                        JobHistoryEventHandler.this.eventCounter++;
                    }
                    try {
                        event = JobHistoryEventHandler.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        LOG.info("EventQueue take interrupted. Returning");
                        return;
                    }
                    Object object = JobHistoryEventHandler.this.lock;
                    synchronized (object) {
                        boolean isInterrupted = Thread.interrupted();
                        JobHistoryEventHandler.this.handleEvent(event);
                        if (isInterrupted) {
                            LOG.debug("Event handling interrupted");
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
        }, "eventHandlingThread");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void serviceStop() throws Exception {
        LOG.info("Stopping JobHistoryEventHandler. Size of the outstanding queue size is " + this.eventQueue.size());
        this.stopped = true;
        Object object = this.lock;
        synchronized (object) {
            if (this.eventHandlingThread != null) {
                LOG.debug("Interrupting Event Handling thread");
                this.eventHandlingThread.interrupt();
            } else {
                LOG.debug("Null event handling thread");
            }
        }
        try {
            if (this.eventHandlingThread != null) {
                LOG.debug("Waiting for Event Handling thread to complete");
                this.eventHandlingThread.join();
            }
        }
        catch (InterruptedException ie) {
            LOG.info("Interrupted Exception while stopping", ie);
        }
        for (MetaInfo mi : fileMap.values()) {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Shutting down timer for " + mi);
                }
                mi.shutDownTimer();
            }
            catch (IOException e) {
                LOG.info("Exception while cancelling delayed flush timer. Likely caused by a failed flush " + e.getMessage());
            }
        }
        for (JobHistoryEvent ev : this.eventQueue) {
            LOG.info("In stop, writing event " + ev.getType());
            this.handleEvent(ev);
        }
        Iterator<JobId> jobIt = fileMap.keySet().iterator();
        if (this.forceJobCompletion) {
            while (jobIt.hasNext()) {
                JobId toClose = jobIt.next();
                MetaInfo mi = fileMap.get(toClose);
                if (mi == null || !mi.isWriterActive()) continue;
                LOG.warn("Found jobId " + toClose + " to have not been closed. Will close");
                JobUnsuccessfulCompletionEvent jucEvent = new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), System.currentTimeMillis(), this.context.getJob(toClose).getCompletedMaps(), this.context.getJob(toClose).getCompletedReduces(), JobState.KILLED.toString());
                JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
                this.handleEvent(jfEvent);
            }
        }
        for (MetaInfo mi : fileMap.values()) {
            try {
                mi.closeWriter();
            }
            catch (IOException e) {
                LOG.info("Exception while closing file " + e.getMessage());
            }
        }
        LOG.info("Stopped JobHistoryEventHandler. super.stop()");
        super.serviceStop();
    }

    protected EventWriter createEventWriter(Path historyFilePath) throws IOException {
        FSDataOutputStream out = this.stagingDirFS.create(historyFilePath, true);
        return new EventWriter(out);
    }

    protected void setupEventWriter(JobId jobId) throws IOException {
        if (this.stagingDirPath == null) {
            LOG.error("Log Directory is null, returning");
            throw new IOException("Missing Log Directory for History");
        }
        MetaInfo oldFi = fileMap.get(jobId);
        Configuration conf = this.getConfig();
        Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(this.stagingDirPath, jobId, this.startCount);
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        if (user == null) {
            throw new IOException("User is null while setting up jobhistory eventwriter");
        }
        String jobName = this.context.getJob(jobId).getName();
        EventWriter writer = oldFi == null ? null : oldFi.writer;
        Path logDirConfPath = JobHistoryUtils.getStagingConfFile(this.stagingDirPath, jobId, this.startCount);
        if (writer == null) {
            try {
                writer = this.createEventWriter(historyFile);
                LOG.info("Event Writer setup for JobId: " + jobId + ", File: " + historyFile);
            }
            catch (IOException ioe) {
                LOG.info("Could not create log file: [" + historyFile + "] + for job " + "[" + jobName + "]");
                throw ioe;
            }
            if (conf != null) {
                FSDataOutputStream jobFileOut = null;
                try {
                    if (logDirConfPath != null) {
                        jobFileOut = this.stagingDirFS.create(logDirConfPath, true);
                        conf.writeXml(jobFileOut);
                        jobFileOut.close();
                    }
                }
                catch (IOException e) {
                    LOG.info("Failed to write the job configuration file", e);
                    throw e;
                }
            }
        }
        MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, user, jobName, jobId);
        fi.getJobSummary().setJobId(jobId);
        fileMap.put(jobId, fi);
    }

    public void closeWriter(JobId id) throws IOException {
        try {
            MetaInfo mi = fileMap.get(id);
            if (mi != null) {
                mi.closeWriter();
            }
        }
        catch (IOException e) {
            LOG.error("Error closing writer for JobID: " + id);
            throw e;
        }
    }

    @Override
    public void handle(JobHistoryEvent event) {
        try {
            if (this.isJobCompletionEvent(event.getHistoryEvent())) {
                this.maxUnflushedCompletionEvents *= this.postJobCompletionMultiplier;
            }
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException(e);
        }
    }

    private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
        return EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED, EventType.JOB_KILLED).contains((Object)historyEvent.getEventType());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleEvent(JobHistoryEvent event) {
        Object object = this.lock;
        synchronized (object) {
            JobUnsuccessfulCompletionEvent jucEvent;
            if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
                try {
                    this.setupEventWriter(event.getJobID());
                }
                catch (IOException ioe) {
                    LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe);
                    throw new YarnRuntimeException(ioe);
                }
            }
            MetaInfo mi = fileMap.get(event.getJobID());
            try {
                HistoryEvent historyEvent = event.getHistoryEvent();
                if (!(historyEvent instanceof NormalizedResourceEvent)) {
                    mi.writeEvent(historyEvent);
                }
                this.processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("In HistoryEventHandler " + (Object)((Object)event.getHistoryEvent().getEventType()));
                }
            }
            catch (IOException e) {
                LOG.error("Error writing History Event: " + event.getHistoryEvent(), e);
                throw new YarnRuntimeException(e);
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
                JobSubmittedEvent jobSubmittedEvent = (JobSubmittedEvent)event.getHistoryEvent();
                mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
                mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
                try {
                    JobFinishedEvent jFinishedEvent = (JobFinishedEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
                    this.closeEventWriter(event.getJobID());
                    this.processDoneFiles(event.getJobID());
                }
                catch (IOException e) {
                    throw new YarnRuntimeException(e);
                }
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
                try {
                    jucEvent = (JobUnsuccessfulCompletionEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
                    this.closeEventWriter(event.getJobID());
                    if (this.context.isLastAMRetry()) {
                        this.processDoneFiles(event.getJobID());
                    }
                }
                catch (IOException e) {
                    throw new YarnRuntimeException(e);
                }
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
                try {
                    jucEvent = (JobUnsuccessfulCompletionEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
                    this.closeEventWriter(event.getJobID());
                    this.processDoneFiles(event.getJobID());
                }
                catch (IOException e) {
                    throw new YarnRuntimeException(e);
                }
            }
        }
    }

    public void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) {
        switch (event.getEventType()) {
            case JOB_SUBMITTED: {
                JobSubmittedEvent jse = (JobSubmittedEvent)event;
                summary.setUser(jse.getUserName());
                summary.setQueue(jse.getJobQueueName());
                summary.setJobSubmitTime(jse.getSubmitTime());
                summary.setJobName(jse.getJobName());
                break;
            }
            case NORMALIZED_RESOURCE: {
                NormalizedResourceEvent normalizedResourceEvent = (NormalizedResourceEvent)event;
                if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
                    summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
                    break;
                }
                if (normalizedResourceEvent.getTaskType() != TaskType.REDUCE) break;
                summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
                break;
            }
            case JOB_INITED: {
                JobInitedEvent jie = (JobInitedEvent)event;
                summary.setJobLaunchTime(jie.getLaunchTime());
                break;
            }
            case MAP_ATTEMPT_STARTED: {
                TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent)event;
                if (summary.getFirstMapTaskLaunchTime() != 0L) break;
                summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
                break;
            }
            case REDUCE_ATTEMPT_STARTED: {
                TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent)event;
                if (summary.getFirstReduceTaskLaunchTime() != 0L) break;
                summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
                break;
            }
            case JOB_FINISHED: {
                JobFinishedEvent jfe = (JobFinishedEvent)event;
                summary.setJobFinishTime(jfe.getFinishTime());
                summary.setNumFinishedMaps(jfe.getFinishedMaps());
                summary.setNumFailedMaps(jfe.getFailedMaps());
                summary.setNumFinishedReduces(jfe.getFinishedReduces());
                summary.setNumFailedReduces(jfe.getFailedReduces());
                if (summary.getJobStatus() == null) {
                    summary.setJobStatus(JobStatus.State.SUCCEEDED.toString());
                }
                this.setSummarySlotSeconds(summary, jfe.getTotalCounters());
                break;
            }
            case JOB_FAILED: 
            case JOB_KILLED: {
                JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent)event;
                summary.setJobStatus(juce.getStatus());
                summary.setNumFinishedMaps(this.context.getJob(jobId).getTotalMaps());
                summary.setNumFinishedReduces(this.context.getJob(jobId).getTotalReduces());
                summary.setJobFinishTime(juce.getFinishTime());
                this.setSummarySlotSeconds(summary, this.context.getJob(jobId).getAllCounters());
                break;
            }
        }
    }

    private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
        Object slotMillisReduceCounter;
        Object slotMillisMapCounter = allCounters.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
        if (slotMillisMapCounter != null) {
            summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000L);
        }
        if ((slotMillisReduceCounter = allCounters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES)) != null) {
            summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000L);
        }
    }

    protected void closeEventWriter(JobId jobId) throws IOException {
        MetaInfo mi = fileMap.get(jobId);
        if (mi == null) {
            throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
        }
        if (!mi.isWriterActive()) {
            throw new IOException("Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" + jobId + "]");
        }
        try {
            mi.closeWriter();
        }
        catch (IOException e) {
            LOG.error("Error closing writer for JobID: " + jobId);
            throw e;
        }
    }

    protected void processDoneFiles(JobId jobId) throws IOException {
        MetaInfo mi = fileMap.get(jobId);
        if (mi == null) {
            throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
        }
        if (mi.getHistoryFile() == null) {
            LOG.warn("No file for job-history with " + jobId + " found in cache!");
        }
        if (mi.getConfFile() == null) {
            LOG.warn("No file for jobconf with " + jobId + " found in cache!");
        }
        Path qualifiedSummaryDoneFile = null;
        FSDataOutputStream summaryFileOut = null;
        try {
            String doneSummaryFileName = this.getTempFileName(JobHistoryUtils.getIntermediateSummaryFileName(jobId));
            qualifiedSummaryDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneSummaryFileName));
            summaryFileOut = this.doneDirFS.create(qualifiedSummaryDoneFile, true);
            summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
            summaryFileOut.close();
            this.doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
        }
        catch (IOException e) {
            LOG.info("Unable to write out JobSummaryInfo to [" + qualifiedSummaryDoneFile + "]", e);
            throw e;
        }
        try {
            Path qualifiedDoneFile = null;
            if (mi.getHistoryFile() != null) {
                Path historyFile = mi.getHistoryFile();
                Path qualifiedLogFile = this.stagingDirFS.makeQualified(historyFile);
                String doneJobHistoryFileName = this.getTempFileName(FileNameIndexUtils.getDoneFileName(mi.getJobIndexInfo()));
                qualifiedDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneJobHistoryFileName));
                this.moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
            }
            Path qualifiedConfDoneFile = null;
            if (mi.getConfFile() != null) {
                Path confFile = mi.getConfFile();
                Path qualifiedConfFile = this.stagingDirFS.makeQualified(confFile);
                String doneConfFileName = this.getTempFileName(JobHistoryUtils.getIntermediateConfFileName(jobId));
                qualifiedConfDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneConfFileName));
                this.moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
            }
            this.moveTmpToDone(qualifiedSummaryDoneFile);
            this.moveTmpToDone(qualifiedConfDoneFile);
            this.moveTmpToDone(qualifiedDoneFile);
        }
        catch (IOException e) {
            LOG.error("Error closing writer for JobID: " + jobId);
            throw e;
        }
    }

    private void moveTmpToDone(Path tmpPath) throws IOException {
        if (tmpPath != null) {
            String tmpFileName = tmpPath.getName();
            String fileName = this.getFileNameFromTmpFN(tmpFileName);
            Path path = new Path(tmpPath.getParent(), fileName);
            this.doneDirFS.rename(tmpPath, path);
            LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
        }
    }

    private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
        if (this.stagingDirFS.exists(fromPath)) {
            boolean copied;
            LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
            if (this.doneDirFS.exists(toPath)) {
                this.doneDirFS.delete(toPath, true);
            }
            if (copied = FileUtil.copy(this.stagingDirFS, fromPath, this.doneDirFS, toPath, false, this.getConfig())) {
                LOG.info("Copied to done location: " + toPath);
            } else {
                LOG.info("copy failed");
            }
            this.doneDirFS.setPermission(toPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
        }
    }

    boolean pathExists(FileSystem fileSys, Path path) throws IOException {
        return fileSys.exists(path);
    }

    private String getTempFileName(String srcFile) {
        return srcFile + "_tmp";
    }

    private String getFileNameFromTmpFN(String tmpFileName) {
        return tmpFileName.substring(0, tmpFileName.length() - 4);
    }

    public void setForcejobCompletion(boolean forceJobCompletion) {
        this.forceJobCompletion = forceJobCompletion;
        LOG.info("JobHistoryEventHandler notified that forceJobCompletion is " + forceJobCompletion);
    }

    protected class MetaInfo {
        private Path historyFile;
        private Path confFile;
        private EventWriter writer;
        JobIndexInfo jobIndexInfo;
        JobSummary jobSummary;
        Timer flushTimer;
        FlushTimerTask flushTimerTask;
        private boolean isTimerShutDown = false;

        MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, String jobName, JobId jobId) {
            this.historyFile = historyFile;
            this.confFile = conf;
            this.writer = writer;
            this.jobIndexInfo = new JobIndexInfo(-1L, -1L, user, jobName, jobId, -1, -1, null);
            this.jobSummary = new JobSummary();
            this.flushTimer = new Timer("FlushTimer", true);
        }

        Path getHistoryFile() {
            return this.historyFile;
        }

        Path getConfFile() {
            return this.confFile;
        }

        JobIndexInfo getJobIndexInfo() {
            return this.jobIndexInfo;
        }

        JobSummary getJobSummary() {
            return this.jobSummary;
        }

        boolean isWriterActive() {
            return this.writer != null;
        }

        boolean isTimerShutDown() {
            return this.isTimerShutDown;
        }

        public String toString() {
            return "Job MetaInfo for " + this.jobSummary.getJobId() + " history file " + this.historyFile;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void closeWriter() throws IOException {
            LOG.debug("Closing Writer");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (this.writer != null) {
                    this.writer.close();
                }
                this.writer = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void writeEvent(HistoryEvent event) throws IOException {
            LOG.debug("Writing event");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (this.writer != null) {
                    this.writer.write(event);
                    this.processEventForFlush(event);
                    this.maybeFlush(event);
                }
            }
        }

        void processEventForFlush(HistoryEvent historyEvent) throws IOException {
            if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED, new EventType[]{EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED, EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED, EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED, EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED, EventType.JOB_KILLED}).contains((Object)historyEvent.getEventType())) {
                JobHistoryEventHandler.this.numUnflushedCompletionEvents++;
                if (!JobHistoryEventHandler.this.isTimerActive) {
                    this.resetFlushTimer();
                    if (!this.isTimerShutDown) {
                        this.flushTimerTask = new FlushTimerTask(this);
                        this.flushTimer.schedule((TimerTask)this.flushTimerTask, JobHistoryEventHandler.this.flushTimeout);
                    }
                }
            }
        }

        void resetFlushTimer() throws IOException {
            if (this.flushTimerTask != null) {
                IOException exception = this.flushTimerTask.getException();
                this.flushTimerTask.stop();
                if (exception != null) {
                    throw exception;
                }
                this.flushTimerTask = null;
            }
            JobHistoryEventHandler.this.isTimerActive = false;
        }

        void maybeFlush(HistoryEvent historyEvent) throws IOException {
            if (JobHistoryEventHandler.this.eventQueue.size() < JobHistoryEventHandler.this.minQueueSizeForBatchingFlushes && JobHistoryEventHandler.this.numUnflushedCompletionEvents > 0 || JobHistoryEventHandler.this.numUnflushedCompletionEvents >= JobHistoryEventHandler.this.maxUnflushedCompletionEvents || JobHistoryEventHandler.this.isJobCompletionEvent(historyEvent)) {
                this.flush();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void flush() throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Flushing " + this.toString());
            }
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (JobHistoryEventHandler.this.numUnflushedCompletionEvents != 0) {
                    this.writer.flush();
                    JobHistoryEventHandler.this.numUnflushedCompletionEvents = 0;
                    this.resetFlushTimer();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void shutDownTimer() throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Shutting down timer " + this.toString());
            }
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                this.isTimerShutDown = true;
                this.flushTimer.cancel();
                if (this.flushTimerTask != null && this.flushTimerTask.getException() != null) {
                    throw this.flushTimerTask.getException();
                }
            }
        }
    }

    private class FlushTimerTask
    extends TimerTask {
        private MetaInfo metaInfo;
        private IOException ioe = null;
        private volatile boolean shouldRun = true;

        FlushTimerTask(MetaInfo metaInfo) {
            this.metaInfo = metaInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.debug("In flush timer task");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                try {
                    if (!this.metaInfo.isTimerShutDown() && this.shouldRun) {
                        this.metaInfo.flush();
                    }
                }
                catch (IOException e) {
                    this.ioe = e;
                }
            }
        }

        public IOException getException() {
            return this.ioe;
        }

        public void stop() {
            this.shouldRun = false;
            this.cancel();
        }
    }
}

