/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.hadoop.mapred;

import com.google.common.annotations.VisibleForTesting;
import com.mapr.baseutils.BaseUtilsHelper;
import com.mapr.fs.MapRPathId;
import com.mapr.hadoop.mapred.MapRDirectShuffleMetaData;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathId;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;

public class LocalVolumeAuxService
extends AuxiliaryService {
    private static final Log LOG = LogFactory.getLog(LocalVolumeAuxService.class);
    private static final String SERVICE_NAME = "direct_shuffle";
    private static final String MAPR_INSTALL_DIR = BaseUtilsHelper.getPathToMaprHome();
    private static final String LOCALHOSTNAME = BaseUtilsHelper.getMapRHostName();
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_PATH = "/server/createTTVolume.sh";
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_LOGFILE_PATH = "/logs/createNMVolume.log";
    private static final String YARN_ARG = "yarn";
    private static final String MAPR_LOCALOUTPUT_DIR_PARAM = "mapr.localoutput.dir";
    private static final String MAPR_LOCALOUTPUT_DIR_DEFAULT = "output";
    private static final String MAPR_LOCALSPILL_DIR_PARAM = "mapr.localspill.dir";
    private static final String MAPR_LOCALSPILL_DIR_DEFAULT = "spill";
    private static final String MAPR_UNCOMPRESSED_SUFFIX = ".U";
    private static final String VOLUME_HEALTH_CHECK_INTERVAL = "mapreduce.volume.healthcheck.interval";
    public static final int LOCAL_VOL_SERVICE_SHUTDOWN_HOOK_PRIORITY = 40;
    private String[] fidRoots;
    private String[] rootDirNames;
    private Configuration conf;
    private FileSystem maprfs;
    private MapRDirectShuffleMetaData metaData;
    private Map<String, MapRDirectShuffleMetaData> jobMetaData = new ConcurrentHashMap<String, MapRDirectShuffleMetaData>();
    private volatile boolean isShuttingDown = false;
    private ScheduledExecutorService volumeChecker;
    private int volumeCheckInterval = 60000;

    protected LocalVolumeAuxService() {
        super(SERVICE_NAME);
        this.metaData = new MapRDirectShuffleMetaData();
        this.volumeChecker = Executors.newSingleThreadScheduledExecutor();
    }

    protected void serviceInit(Configuration conf) throws Exception {
        String spillUDirName;
        String spillDirName;
        String outputUDirName;
        String outputDirName;
        super.serviceInit(conf);
        if (LOCALHOSTNAME == null) {
            throw new ServiceStateException("Cannot initialize direct_shuffle. Error obtaining hostname from " + BaseUtilsHelper.HOST_NAME_FILE_PATH + ".");
        }
        this.conf = conf;
        this.maprfs = FileSystem.get((Configuration)conf);
        this.rootDirNames = new String[5];
        this.rootDirNames[FidId.ROOT.ordinal()] = ".";
        this.rootDirNames[FidId.OUTPUT.ordinal()] = outputDirName = conf.get(MAPR_LOCALOUTPUT_DIR_PARAM, MAPR_LOCALOUTPUT_DIR_DEFAULT);
        this.rootDirNames[FidId.OUTPUT_U.ordinal()] = outputUDirName = outputDirName + MAPR_UNCOMPRESSED_SUFFIX;
        this.rootDirNames[FidId.SPILL.ordinal()] = spillDirName = conf.get(MAPR_LOCALSPILL_DIR_PARAM, MAPR_LOCALSPILL_DIR_DEFAULT);
        this.rootDirNames[FidId.SPILL_U.ordinal()] = spillUDirName = spillDirName + MAPR_UNCOMPRESSED_SUFFIX;
        this.metaData.setNodeManageHostName(LOCALHOSTNAME);
        this.volumeCheckInterval = conf.getInt(VOLUME_HEALTH_CHECK_INTERVAL, this.volumeCheckInterval);
        ShutdownHookManager.get().addShutdownHook((Runnable)new LocalVolumeAuxServiceShutDownHook(this), 40);
    }

    protected void serviceStart() throws Exception {
        this.initVolume();
        this.volumeChecker.scheduleAtFixedRate(new VolumeHealthCheckTask(), this.volumeCheckInterval, this.volumeCheckInterval, TimeUnit.MILLISECONDS);
    }

    private void initVolume() throws IOException {
        String[] args = new String[]{MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_PATH, LOCALHOSTNAME, this.getMapRedLocalVolumeMountPath(), this.getNodeManagerDirPath(), YARN_ARG};
        HashMap<String, String> env = new HashMap<String, String>();
        env.put("MAPR_MAPREDUCE_MODE", YARN_ARG);
        Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(args, null, env);
        LOG.info((Object)("Checking for local volume. If volume is not present command will create and mount it. Command invoked is : " + shexec.toString()));
        try {
            shexec.execute();
            LOG.info((Object)("Sucessfully created volume and mounted at " + args[2]));
        }
        catch (IOException ioe) {
            int exitCode = shexec.getExitCode();
            if (exitCode != 0) {
                LOG.error((Object)("Failed to create and mount local mapreduce volume at " + args[2] + ". Please see logs at " + MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_LOGFILE_PATH));
                LOG.error((Object)("Command ran " + shexec.toString()));
                LOG.error((Object)("Command output " + shexec.getOutput()));
            }
            throw ioe;
        }
        try {
            this.initMapReduceDirs();
        }
        catch (IOException ioe) {
            LOG.error((Object)"Could not initialize directories for mapreduce", (Throwable)ioe);
            throw ioe;
        }
    }

    protected void serviceStop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info((Object)"Shutting down Volume Checker service");
        }
        this.volumeChecker.shutdown();
    }

    public void initializeApplication(ApplicationInitializationContext context) {
        String jobUser = context.getUser();
        ApplicationId appId = context.getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        String jobIdStr = jobId.toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("In initializeApplication. Application Id: " + appId.getId() + ", Job Id: " + jobIdStr));
        }
        if (this.jobMetaData.get(jobIdStr) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Fids for job: " + jobIdStr + " already created. skipping initializeApplication."));
            }
            return;
        }
        LOG.info((Object)("initializeApplication for job: " + jobIdStr + " and user: " + jobUser));
        String jobUserGroup = null;
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)jobUser);
        String[] groupNames = ugi.getGroupNames();
        if (groupNames != null && groupNames.length > 0) {
            jobUserGroup = groupNames[0];
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("User: " + jobUser + " Group: " + jobUserGroup));
        }
        try {
            String[] jobFids = new String[this.fidRoots.length];
            jobFids[FidId.ROOT.ordinal()] = this.fidRoots[FidId.ROOT.ordinal()];
            MapRDirectShuffleMetaData data = new MapRDirectShuffleMetaData();
            PathId shuffleRootPathId = this.metaData.getMapReduceDirsPathIds().get(this.rootDirNames[FidId.ROOT.ordinal()]);
            data.putDirPathId(this.rootDirNames[FidId.ROOT.ordinal()], shuffleRootPathId);
            data.setNodeManageHostName(LOCALHOSTNAME);
            for (int i = FidId.ROOT.ordinal() + 1; i < this.fidRoots.length; ++i) {
                jobFids[i] = this.maprfs.mkdirsFid(this.fidRoots[i], jobIdStr);
                MapRPathId dirPathId = new MapRPathId();
                dirPathId.setFid(jobFids[i]);
                dirPathId.setIps(shuffleRootPathId.getIPs());
                data.putDirPathId(this.rootDirNames[i], (PathId)dirPathId);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)(FidId.values()[i].name() + " fid for " + jobIdStr + ": " + jobFids[i]));
                }
                if (jobUser == null) continue;
                this.maprfs.setOwnerFid(jobFids[i], jobUser, jobUserGroup);
            }
            this.jobMetaData.put(jobIdStr, data);
        }
        catch (IOException e) {
            LOG.error((Object)("Error during initializeApplication. App Id: " + appId.getId() + ", Job Id: " + jobIdStr), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopApplication(ApplicationTerminationContext stopAppContext) {
        ApplicationId appId = stopAppContext.getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        String jobIdStr = jobId.toString();
        if (this.isShuttingDown) {
            LOG.info((Object)("NodeManager is shutting down but " + appId.toString() + "/" + jobIdStr + " might still be running. " + "Not cleaning up the " + jobIdStr + " directory in the local volume."));
            return;
        }
        try {
            for (int i = FidId.ROOT.ordinal() + 1; i < this.fidRoots.length; ++i) {
                if (this.maprfs.deleteFid(this.fidRoots[i], jobIdStr)) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug((Object)("Deleted " + jobIdStr + " from " + (Object)((Object)FidId.values()[i])));
                    continue;
                }
                LOG.warn((Object)(jobIdStr + " could not be deleted from " + (Object)((Object)FidId.values()[i]) + ". Parent Fid: " + this.fidRoots[i]));
            }
        }
        catch (IOException e) {
            LOG.error((Object)("Error during stopApplication. App Id: " + appId.getId() + ", Job Id: " + jobIdStr), (Throwable)e);
        }
        finally {
            this.jobMetaData.remove(jobIdStr);
        }
    }

    public ByteBuffer getMetaData() {
        try {
            DataOutputBuffer dob = new DataOutputBuffer();
            this.metaData.write((DataOutput)dob);
            return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        }
        catch (IOException e) {
            LOG.error((Object)"Encountered error while returning metadata.", (Throwable)e);
            return null;
        }
    }

    public ByteBuffer getMetaData(ContainerInitializationContext context) {
        if (context == null || context.getContainerId() == null || context.getContainerId().getApplicationAttemptId() == null || context.getContainerId().getApplicationAttemptId().getApplicationId() == null) {
            LOG.warn((Object)"context is null in getMetaData for context. Returning service metadata");
            return this.getMetaData();
        }
        ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        String jobIdStr = jobId.toString();
        MapRDirectShuffleMetaData jobData = this.jobMetaData.get(jobIdStr);
        if (jobData == null) {
            LOG.error((Object)"Can not find metadata for a job. Returning service metadata");
            return this.getMetaData();
        }
        try {
            DataOutputBuffer dob = new DataOutputBuffer();
            jobData.write((DataOutput)dob);
            return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        }
        catch (IOException e) {
            LOG.error((Object)"Encountered error while returning metadata.", (Throwable)e);
            return null;
        }
    }

    private void initMapReduceDirs() throws IOException {
        String shuffleRootFid;
        this.fidRoots = new String[5];
        this.fidRoots[FidId.ROOT.ordinal()] = shuffleRootFid = this.maprfs.mkdirsFid(new Path(this.getNodeManagerDirPath()));
        MapRPathId shuffleRootPathId = new MapRPathId();
        FSDataOutputStream fileId = this.maprfs.createFid(shuffleRootFid, "fidservers");
        shuffleRootPathId.setFid(shuffleRootFid);
        shuffleRootPathId.setIps(fileId.getFidServers());
        fileId.close();
        String outputDirName = this.conf.get(MAPR_LOCALOUTPUT_DIR_PARAM, MAPR_LOCALOUTPUT_DIR_DEFAULT);
        String outputUDirName = outputDirName + MAPR_UNCOMPRESSED_SUFFIX;
        String spillDirName = this.conf.get(MAPR_LOCALSPILL_DIR_PARAM, MAPR_LOCALSPILL_DIR_DEFAULT);
        String spillUDirName = spillDirName + MAPR_UNCOMPRESSED_SUFFIX;
        this.metaData.putDirPathId(".", (PathId)shuffleRootPathId);
        LOG.info((Object)("root fid : " + shuffleRootPathId.getFid()));
        PathId oPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, outputDirName);
        this.metaData.putDirPathId(outputDirName, oPid);
        this.fidRoots[FidId.OUTPUT.ordinal()] = oPid.getFid();
        PathId ouPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, outputUDirName);
        this.metaData.putDirPathId(outputUDirName, ouPid);
        this.fidRoots[FidId.OUTPUT_U.ordinal()] = ouPid.getFid();
        PathId sPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, spillDirName);
        this.metaData.putDirPathId(spillDirName, sPid);
        this.fidRoots[FidId.SPILL.ordinal()] = sPid.getFid();
        PathId suPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, spillUDirName);
        this.metaData.putDirPathId(spillUDirName, suPid);
        this.fidRoots[FidId.SPILL_U.ordinal()] = suPid.getFid();
    }

    private PathId createDirAndGetPathId(PathId parentPathId, String dirName) throws IOException {
        String dirFid = this.maprfs.mkdirsFid(parentPathId.getFid(), dirName);
        MapRPathId dirPathId = new MapRPathId();
        dirPathId.setFid(dirFid);
        dirPathId.setIps(parentPathId.getIPs());
        LOG.info((Object)(dirName + " fid : " + dirFid));
        return dirPathId;
    }

    @VisibleForTesting
    String getMapRedLocalVolumeMountPath() {
        return this.conf.get("mapr.mapred.localvolume.mount.path", "/var/mapr/local/" + LOCALHOSTNAME + "/mapred");
    }

    @VisibleForTesting
    String getNodeManagerDirPath() {
        return this.conf.get("mapr.mapred.localvolume.root.dir.path", this.getMapRedLocalVolumeMountPath() + "/nodeManager");
    }

    private class VolumeHealthCheckTask
    implements Runnable {
        private VolumeHealthCheckTask() {
        }

        @Override
        public void run() {
            Path volumePath = new Path(LocalVolumeAuxService.this.getMapRedLocalVolumeMountPath());
            Path nmPath = new Path(LocalVolumeAuxService.this.getNodeManagerDirPath());
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Checking mapreduce volume " + volumePath));
                }
                LocalVolumeAuxService.this.maprfs.getFileStatus(volumePath);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Done checking mapreduce volume " + volumePath));
                }
                LocalVolumeAuxService.this.maprfs.getFileStatus(nmPath);
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, "fidservers"));
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to get status of mapreduce volume " + volumePath + " or it's subdirectories"), (Throwable)e);
                try {
                    LocalVolumeAuxService.this.initVolume();
                }
                catch (Exception innerEx) {
                    // empty catch block
                }
            }
        }
    }

    private class LocalVolumeAuxServiceShutDownHook
    implements Runnable {
        private final LocalVolumeAuxService service;

        private LocalVolumeAuxServiceShutDownHook(LocalVolumeAuxService service) {
            this.service = service;
        }

        @Override
        public void run() {
            this.service.isShuttingDown = true;
        }
    }

    static enum FidId {
        ROOT,
        OUTPUT,
        OUTPUT_U,
        SPILL,
        SPILL_U;

    }
}

