/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathId;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MapRDirectShuffleMetaData;
import org.apache.hadoop.maprfs.AbstractMapRFileSystem;
import org.apache.hadoop.maprfs.MapRPathId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.BaseMapRUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.util.ScramCredentialScriptUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalVolumeAuxService
extends AuxiliaryService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalVolumeAuxService.class);
    private static final String SERVICE_NAME = "direct_shuffle";
    private static final String MAPR_INSTALL_DIR = BaseMapRUtil.getPathToMaprHome();
    private static final String LOCALHOSTNAME = BaseMapRUtil.getMapRHostName();
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_PATH = "/server/createLocalVolumes.sh";
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_LOGFILE_PATH = "/logs/createNMVolume.log";
    private static final String YARN_ARG = "yarn";
    private static final String STAGING_ARG = "staging";
    private static final String SPARK_ARG = "spark";
    private static final String VOLUME_HEALTH_CHECK_INTERVAL = "mapreduce.volume.healthcheck.interval";
    private static final String SPILL_EXPIRATION_DATE = "mapr.localspill.expiration.date";
    public static final int LOCAL_VOL_SERVICE_SHUTDOWN_HOOK_PRIORITY = 40;
    private String[] fidRoots;
    private String[] rootDirNames;
    private Configuration conf;
    private FileSystem maprfs;
    protected MapRDirectShuffleMetaData metaData;
    protected final Map<String, MapRDirectShuffleMetaData> jobMetaData = new ConcurrentHashMap<String, MapRDirectShuffleMetaData>();
    private volatile boolean isShuttingDown = false;
    private final ReentrantReadWriteLock healthLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.healthLock.readLock();
    private final Lock writeLock = this.healthLock.writeLock();
    private ScheduledExecutorService volumeChecker;
    private ScheduledThreadPoolExecutor deletionService;
    private int volumeCheckInterval = 60000;
    private int spillExpirationDate = 30;

    protected LocalVolumeAuxService() {
        super(SERVICE_NAME);
        this.metaData = new MapRDirectShuffleMetaData();
        this.volumeChecker = Executors.newSingleThreadScheduledExecutor();
    }

    @VisibleForTesting
    protected void setFS(FileSystem fs) {
        this.maprfs = fs;
    }

    @VisibleForTesting
    protected void setConf(Configuration conf) {
        this.conf = conf;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        String spillUDirName;
        String spillDirName;
        String outputUDirName;
        String outputDirName;
        super.serviceInit(conf);
        if (LOCALHOSTNAME == null) {
            throw new ServiceStateException("Cannot initialize direct_shuffle. Error obtaining hostname from " + BaseMapRUtil.HOST_NAME_FILE_PATH + ".");
        }
        this.conf = conf;
        if (this.maprfs == null) {
            this.maprfs = FileSystem.get((Configuration)conf);
        }
        this.rootDirNames = new String[5];
        this.rootDirNames[FidId.ROOT.ordinal()] = ".";
        this.rootDirNames[FidId.OUTPUT.ordinal()] = outputDirName = conf.get("mapr.localoutput.dir", "output");
        this.rootDirNames[FidId.OUTPUT_U.ordinal()] = outputUDirName = outputDirName + ".U";
        this.rootDirNames[FidId.SPILL.ordinal()] = spillDirName = this.getSpillDirName();
        this.rootDirNames[FidId.SPILL_U.ordinal()] = spillUDirName = spillDirName + ".U";
        this.metaData.setNodeManageHostName(LOCALHOSTNAME);
        this.volumeCheckInterval = conf.getInt(VOLUME_HEALTH_CHECK_INTERVAL, this.volumeCheckInterval);
        this.spillExpirationDate = conf.getInt(SPILL_EXPIRATION_DATE, this.spillExpirationDate);
        int corePool = 4;
        if (conf != null) {
            corePool = conf.getInt("yarn.nodemanager.delete.thread-count", corePool);
        }
        this.deletionService = new ScheduledThreadPoolExecutor(corePool);
        this.deletionService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.deletionService.setKeepAliveTime(60L, TimeUnit.SECONDS);
        ShutdownHookManager.get().addShutdownHook((Runnable)new LocalVolumeAuxServiceShutDownHook(this), 40);
    }

    protected void serviceStart() throws Exception {
        this.initVolume();
        this.volumeChecker.scheduleAtFixedRate(new VolumeHealthCheckTask(), this.volumeCheckInterval, this.volumeCheckInterval, TimeUnit.MILLISECONDS);
    }

    private void initVolume() throws IOException {
        String[] mapReduceVolumeArgs = new String[]{MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_PATH, LOCALHOSTNAME, this.getMapRedLocalVolumeMountPath(), this.getNodeManagerDirPath(), YARN_ARG};
        String[] stagingVolumeArgs = new String[]{MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_PATH, LOCALHOSTNAME, this.getStagingLocalVolumeMountPath(), this.getStagingLocalVolumeMountPath() + "/nodeManager", STAGING_ARG};
        HashMap<String, String> env = new HashMap<String, String>();
        env.put("MAPR_MAPREDUCE_MODE", YARN_ARG);
        this.executeCommand(mapReduceVolumeArgs, env);
        this.executeCommand(stagingVolumeArgs, env);
        if (this.conf.get("hadoop.security.token.authentication.method", "DIGEST-MD5").equalsIgnoreCase("SCRAM-SHA-256")) {
            ScramCredentialScriptUtil.checkAndCopyScramCreds((Configuration)this.conf, (String)"nodeManager");
        }
        if (this.conf.getBoolean(YarnConfiguration.NM_CREATE_SPARK_VOLUME, YarnConfiguration.DEFAULT_NM_CREATE_SPARK_VOLUME)) {
            String[] sparkVolumeArgs = new String[]{MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_PATH, LOCALHOSTNAME, this.getSparkLocalVolumeMountPath(), this.getSparkLocalVolumeMountPath() + "/nodeManager", SPARK_ARG};
            this.executeCommand(sparkVolumeArgs, env);
        }
        try {
            this.initMapReduceDirs();
        }
        catch (IOException ioe) {
            LOG.error("Could not initialize directories for mapreduce", (Throwable)ioe);
            throw ioe;
        }
    }

    private void executeCommand(String[] args, Map<String, String> env) throws IOException {
        Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(args, null, env);
        LOG.info("Checking for local volume. If volume is not present command will create and mount it. Command invoked is : {}", (Object)shexec.toString());
        try {
            shexec.execute();
            LOG.info("Successfully created volume and mounted at {}", (Object)args[2]);
        }
        catch (IOException ioe) {
            int exitCode = shexec.getExitCode();
            if (exitCode != 0) {
                LOG.error("Failed to create and mount local volume at {}. Please see logs at {}/logs/createNMVolume.log", (Object)args[2], (Object)MAPR_INSTALL_DIR);
                LOG.error("Command ran {}", (Object)shexec.toString());
                LOG.error("Command output {}", (Object)shexec.getOutput());
            }
            throw ioe;
        }
    }

    protected void serviceStop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Shutting down Volume Checker service");
        }
        this.volumeChecker.shutdown();
        if (this.deletionService != null) {
            LOG.info("Cleanup service shutdown");
            this.deletionService.shutdown();
            boolean terminated = false;
            try {
                terminated = this.deletionService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (!terminated) {
                this.deletionService.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeApplication(ApplicationInitializationContext context) {
        String jobUser = context.getUser();
        ApplicationId appId = context.getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        String jobIdStr = jobId.toString();
        LOG.debug("In initializeApplication. Application Id: {}, Job Id: {}", (Object)appId.getId(), (Object)jobIdStr);
        if (this.jobMetaData.get(jobIdStr) != null) {
            LOG.debug("Fids for job: {} already created. skipping initializeApplication.", (Object)jobIdStr);
            return;
        }
        LOG.info("initializeApplication for job: {} and user: {}", (Object)jobIdStr, (Object)jobUser);
        String jobUserGroup = null;
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)jobUser);
        String[] groupNames = ugi.getGroupNames();
        if (groupNames != null && groupNames.length > 0) {
            jobUserGroup = groupNames[0];
        }
        LOG.debug("User: {} Group: {}", (Object)jobUser, jobUserGroup);
        this.readLock.lock();
        try {
            String[] jobFids = new String[this.fidRoots.length];
            jobFids[FidId.ROOT.ordinal()] = this.fidRoots[FidId.ROOT.ordinal()];
            MapRDirectShuffleMetaData data = new MapRDirectShuffleMetaData();
            PathId shuffleRootPathId = this.metaData.getMapReduceDirsPathIds().get(this.rootDirNames[FidId.ROOT.ordinal()]);
            data.putDirPathId(this.rootDirNames[FidId.ROOT.ordinal()], shuffleRootPathId);
            data.setNodeManageHostName(LOCALHOSTNAME);
            for (int i = FidId.ROOT.ordinal() + 1; i < this.fidRoots.length; ++i) {
                jobFids[i] = this.maprfs.mkdirsFid(this.fidRoots[i], jobIdStr);
                MapRPathId dirPathId = new MapRPathId();
                dirPathId.setFid(jobFids[i]);
                dirPathId.setIps(shuffleRootPathId.getIPs());
                data.putDirPathId(this.rootDirNames[i], (PathId)dirPathId);
                LOG.debug("{} fid for {}: {}", new Object[]{FidId.values()[i].name(), jobIdStr, jobFids[i]});
                this.maprfs.setOwnerFid(jobFids[i], jobUser, jobUserGroup);
            }
            this.jobMetaData.put(jobIdStr, data);
        }
        catch (IOException e) {
            LOG.error("Error during initializeApplication. App Id: " + appId.getId() + ", Job Id: " + jobIdStr, (Throwable)e);
        }
        finally {
            this.readLock.unlock();
        }
    }

    public void stopApplication(ApplicationTerminationContext stopAppContext) {
        ApplicationId appId = stopAppContext.getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        final String jobIdStr = jobId.toString();
        if (this.isShuttingDown) {
            LOG.info("NodeManager is shutting down but {}/{} might still be running. Not cleaning up the {} directory in the local volume.", new Object[]{appId.toString(), jobIdStr, jobIdStr});
            return;
        }
        Runnable filesRemoverTask = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LocalVolumeAuxService.this.readLock.lock();
                try {
                    for (int i = FidId.ROOT.ordinal() + 1; i < LocalVolumeAuxService.this.fidRoots.length; ++i) {
                        final String fidRoot = LocalVolumeAuxService.this.fidRoots[i];
                        final FidId fidId = FidId.values()[i];
                        if (LocalVolumeAuxService.this.maprfs.deleteFid(fidRoot, jobIdStr)) {
                            LOG.debug("Deleted " + jobIdStr + " from " + (Object)((Object)fidId));
                            continue;
                        }
                        LOG.warn("{} was failed to delete from {}. Parent Fid: {}. There will be another attempt after 3 hours.", new Object[]{jobIdStr, fidId, fidRoot});
                        LocalVolumeAuxService.this.deletionService.schedule(new Runnable(){

                            @Override
                            public void run() {
                                try {
                                    LocalVolumeAuxService.this.maprfs.deleteFid(fidRoot, jobIdStr);
                                }
                                catch (IOException e) {
                                    LOG.warn("{} could not be deleted from {}. Parent Fid: {}", new Object[]{jobIdStr, fidId, fidRoot});
                                }
                            }
                        }, 3L, TimeUnit.HOURS);
                    }
                }
                catch (Throwable t) {
                    LOG.error("Error during removing localvolume data for Job Id: {}", (Object)jobIdStr, (Object)t);
                }
                finally {
                    LocalVolumeAuxService.this.readLock.unlock();
                    LocalVolumeAuxService.this.jobMetaData.remove(jobIdStr);
                }
            }
        };
        this.deletionService.schedule(filesRemoverTask, 0L, TimeUnit.SECONDS);
    }

    public ByteBuffer getMetaData() {
        try {
            DataOutputBuffer dob = new DataOutputBuffer();
            this.metaData.write((DataOutput)dob);
            return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        }
        catch (IOException e) {
            LOG.error("Encountered error while returning metadata.", (Throwable)e);
            return null;
        }
    }

    public ByteBuffer getMetaData(ContainerInitializationContext context) {
        if (context == null || context.getContainerId() == null || context.getContainerId().getApplicationAttemptId() == null || context.getContainerId().getApplicationAttemptId().getApplicationId() == null) {
            LOG.warn("context is null in getMetaData for context. Returning service metadata");
            return this.getMetaData();
        }
        ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        String jobIdStr = jobId.toString();
        MapRDirectShuffleMetaData jobData = this.jobMetaData.get(jobIdStr);
        if (jobData == null) {
            LOG.debug("Can not find metadata for a job. Returning service metadata");
            return this.getMetaData();
        }
        try {
            DataOutputBuffer dob = new DataOutputBuffer();
            jobData.write((DataOutput)dob);
            return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        }
        catch (IOException e) {
            LOG.error("Encountered error while returning metadata.", (Throwable)e);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initMapReduceDirs() throws IOException {
        this.writeLock.lock();
        try {
            String shuffleRootFid = this.maprfs.mkdirsFid(new Path(this.getNodeManagerDirPath()));
            MapRPathId shuffleRootPathId = new MapRPathId();
            if (!(this.maprfs instanceof AbstractMapRFileSystem)) {
                throw new UnsupportedOperationException("This is not MapRFileSystem implementation, so can not use createFid method");
            }
            FSDataOutputStream fileId = ((AbstractMapRFileSystem)this.maprfs).createFid(shuffleRootFid, "fidservers", true);
            shuffleRootPathId.setFid(shuffleRootFid);
            shuffleRootPathId.setIps(fileId.getFidServers());
            fileId.close();
            String outputDirName = this.rootDirNames[FidId.OUTPUT.ordinal()];
            String outputUDirName = this.rootDirNames[FidId.OUTPUT_U.ordinal()];
            String spillDirName = this.rootDirNames[FidId.SPILL.ordinal()];
            String spillUDirName = this.rootDirNames[FidId.SPILL_U.ordinal()];
            PathId oPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, outputDirName);
            PathId ouPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, outputUDirName);
            PathId sPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, spillDirName);
            PathId suPid = this.createDirAndGetPathId((PathId)shuffleRootPathId, spillUDirName);
            this.metaData.putDirPathId(".", (PathId)shuffleRootPathId);
            LOG.info("root fid : " + shuffleRootPathId.getFid());
            this.fidRoots = new String[5];
            this.fidRoots[FidId.ROOT.ordinal()] = shuffleRootFid;
            this.metaData.putDirPathId(outputDirName, oPid);
            this.fidRoots[FidId.OUTPUT.ordinal()] = oPid.getFid();
            this.metaData.putDirPathId(outputUDirName, ouPid);
            this.fidRoots[FidId.OUTPUT_U.ordinal()] = ouPid.getFid();
            this.metaData.putDirPathId(spillDirName, sPid);
            this.fidRoots[FidId.SPILL.ordinal()] = sPid.getFid();
            this.metaData.putDirPathId(spillUDirName, suPid);
            this.fidRoots[FidId.SPILL_U.ordinal()] = suPid.getFid();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private PathId createDirAndGetPathId(PathId parentPathId, String dirName) throws IOException {
        String dirFid = this.maprfs.mkdirsFid(parentPathId.getFid(), dirName);
        MapRPathId dirPathId = new MapRPathId();
        dirPathId.setFid(dirFid);
        dirPathId.setIps(parentPathId.getIPs());
        LOG.info(dirName + " fid : " + dirFid);
        return dirPathId;
    }

    @VisibleForTesting
    String getMapRedLocalVolumeMountPath() {
        return this.conf.get("mapr.mapred.localvolume.mount.path", "/var/mapr/local/" + LOCALHOSTNAME + "/mapred");
    }

    @VisibleForTesting
    String getNodeManagerDirPath() {
        return this.conf.get("mapr.mapred.localvolume.root.dir.path", this.getMapRedLocalVolumeMountPath() + "/nodeManager");
    }

    @VisibleForTesting
    String getStagingLocalVolumeMountPath() {
        return this.conf.get("mapr.staging.localvolume.mount.path", "/var/mapr/local/" + LOCALHOSTNAME + "/nm-staging");
    }

    @VisibleForTesting
    String getSparkLocalVolumeMountPath() {
        return this.conf.get("mapr.spark.localvolume.mount.path", "/var/mapr/local/" + LOCALHOSTNAME + "/spark");
    }

    private String getSpillDirName() {
        return this.conf.get("mapr.localspill.dir", "spill");
    }

    private class VolumeHealthCheckTask
    implements Runnable {
        private VolumeHealthCheckTask() {
        }

        @Override
        public void run() {
            Path volumePath = new Path(LocalVolumeAuxService.this.getMapRedLocalVolumeMountPath());
            Path nmPath = new Path(LocalVolumeAuxService.this.getNodeManagerDirPath());
            try {
                LOG.debug("Checking mapreduce volume {}", (Object)volumePath);
                LocalVolumeAuxService.this.maprfs.getFileStatus(volumePath);
                LOG.debug("Done checking mapreduce volume {}", (Object)volumePath);
                LocalVolumeAuxService.this.maprfs.getFileStatus(nmPath);
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(nmPath, "fidservers"));
            }
            catch (Exception e) {
                LOG.error("Failed to get status of mapreduce volume " + volumePath + " or it's subdirectories. Trying to recover", (Throwable)e);
                try {
                    LocalVolumeAuxService.this.initVolume();
                }
                catch (Exception innerEx) {
                    LOG.warn("Exception is thrown while trying to recover local volume. If exception persists for considerable amount of time you may need to restart NM");
                }
            }
        }
    }

    private class LocalVolumeAuxServiceShutDownHook
    implements Runnable {
        private final LocalVolumeAuxService service;

        private LocalVolumeAuxServiceShutDownHook(LocalVolumeAuxService service) {
            this.service = service;
        }

        @Override
        public void run() {
            this.service.isShuttingDown = true;
        }
    }

    static enum FidId {
        ROOT,
        OUTPUT,
        OUTPUT_U,
        SPILL,
        SPILL_U;

    }
}

