package com.mapr.hadoop.mapred;

import com.google.common.annotations.VisibleForTesting;
import com.mapr.baseutils.BaseUtilsHelper;
import com.mapr.fs.MapRPathId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathId;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-kms-2.7.0-mapr-1506/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService.class
  input_file:kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService.class
 */
/* loaded from: input_file:kms.war:WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService.class */
public class LocalVolumeAuxService extends AuxiliaryService {
    private static final String SERVICE_NAME = "direct_shuffle";
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_PATH = "/server/createTTVolume.sh";
    private static final String LOCAL_VOLUME_CREATE_SCRIPT_LOGFILE_PATH = "/logs/createNMVolume.log";
    private static final String YARN_ARG = "yarn";
    private static final String MAPR_LOCALOUTPUT_DIR_PARAM = "mapr.localoutput.dir";
    private static final String MAPR_LOCALOUTPUT_DIR_DEFAULT = "output";
    private static final String MAPR_LOCALSPILL_DIR_PARAM = "mapr.localspill.dir";
    private static final String MAPR_LOCALSPILL_DIR_DEFAULT = "spill";
    private static final String MAPR_UNCOMPRESSED_SUFFIX = ".U";
    private static final String VOLUME_HEALTH_CHECK_INTERVAL = "mapreduce.volume.healthcheck.interval";
    public static final int LOCAL_VOL_SERVICE_SHUTDOWN_HOOK_PRIORITY = 40;
    private String[] fidRoots;
    private String[] rootDirNames;
    private Configuration conf;
    private FileSystem maprfs;
    private MapRDirectShuffleMetaData metaData;
    private final Map<String, MapRDirectShuffleMetaData> jobMetaData;
    private volatile boolean isShuttingDown;
    private ScheduledExecutorService volumeChecker;
    private ScheduledThreadPoolExecutor deletionService;
    private int volumeCheckInterval;
    private static final Log LOG = LogFactory.getLog(LocalVolumeAuxService.class);
    private static final String MAPR_INSTALL_DIR = BaseUtilsHelper.getPathToMaprHome();
    private static final String LOCALHOSTNAME = BaseUtilsHelper.getMapRHostName();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-kms-2.7.0-mapr-1506/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$FidId.class
      input_file:kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$FidId.class
     */
    /* loaded from: input_file:kms.war:WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$FidId.class */
    public enum FidId {
        ROOT,
        OUTPUT,
        OUTPUT_U,
        SPILL,
        SPILL_U
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-kms-2.7.0-mapr-1506/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$LocalVolumeAuxServiceShutDownHook.class
      input_file:kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$LocalVolumeAuxServiceShutDownHook.class
     */
    /* loaded from: input_file:kms.war:WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$LocalVolumeAuxServiceShutDownHook.class */
    private class LocalVolumeAuxServiceShutDownHook implements Runnable {
        private final LocalVolumeAuxService service;

        private LocalVolumeAuxServiceShutDownHook(LocalVolumeAuxService localVolumeAuxService) {
            this.service = localVolumeAuxService;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.service.isShuttingDown = true;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-kms-2.7.0-mapr-1506/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$VolumeHealthCheckTask.class
      input_file:kms/WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$VolumeHealthCheckTask.class
     */
    /* loaded from: input_file:kms.war:WEB-INF/lib/maprfs-5.0.0-mapr.jar:com/mapr/hadoop/mapred/LocalVolumeAuxService$VolumeHealthCheckTask.class */
    private class VolumeHealthCheckTask implements Runnable {
        private VolumeHealthCheckTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Path path = new Path(LocalVolumeAuxService.this.getMapRedLocalVolumeMountPath());
            Path path2 = new Path(LocalVolumeAuxService.this.getNodeManagerDirPath());
            try {
                if (LocalVolumeAuxService.LOG.isDebugEnabled()) {
                    LocalVolumeAuxService.LOG.debug("Checking mapreduce volume " + path);
                }
                LocalVolumeAuxService.this.maprfs.getFileStatus(path);
                if (LocalVolumeAuxService.LOG.isDebugEnabled()) {
                    LocalVolumeAuxService.LOG.debug("Done checking mapreduce volume " + path);
                }
                LocalVolumeAuxService.this.maprfs.getFileStatus(path2);
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(path2, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(path2, LocalVolumeAuxService.this.rootDirNames[FidId.OUTPUT_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(path2, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(path2, LocalVolumeAuxService.this.rootDirNames[FidId.SPILL_U.ordinal()]));
                LocalVolumeAuxService.this.maprfs.getFileStatus(new Path(path2, "fidservers"));
            } catch (Exception e) {
                LocalVolumeAuxService.LOG.error("Failed to get status of mapreduce volume " + path + " or it's subdirectories", e);
                try {
                    LocalVolumeAuxService.this.initVolume();
                } catch (Exception e2) {
                }
            }
        }
    }

    protected LocalVolumeAuxService() {
        super("direct_shuffle");
        this.jobMetaData = new ConcurrentHashMap();
        this.isShuttingDown = false;
        this.volumeCheckInterval = 60000;
        this.metaData = new MapRDirectShuffleMetaData();
        this.volumeChecker = Executors.newSingleThreadScheduledExecutor();
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
        if (LOCALHOSTNAME == null) {
            throw new ServiceStateException("Cannot initialize direct_shuffle. Error obtaining hostname from " + BaseUtilsHelper.HOST_NAME_FILE_PATH + ".");
        }
        this.conf = configuration;
        this.maprfs = FileSystem.get(configuration);
        this.rootDirNames = new String[5];
        this.rootDirNames[FidId.ROOT.ordinal()] = ".";
        String str = configuration.get(MAPR_LOCALOUTPUT_DIR_PARAM, "output");
        this.rootDirNames[FidId.OUTPUT.ordinal()] = str;
        this.rootDirNames[FidId.OUTPUT_U.ordinal()] = str + MAPR_UNCOMPRESSED_SUFFIX;
        String str2 = configuration.get(MAPR_LOCALSPILL_DIR_PARAM, MAPR_LOCALSPILL_DIR_DEFAULT);
        this.rootDirNames[FidId.SPILL.ordinal()] = str2;
        this.rootDirNames[FidId.SPILL_U.ordinal()] = str2 + MAPR_UNCOMPRESSED_SUFFIX;
        this.metaData.setNodeManageHostName(LOCALHOSTNAME);
        this.volumeCheckInterval = configuration.getInt(VOLUME_HEALTH_CHECK_INTERVAL, this.volumeCheckInterval);
        int i = 4;
        if (configuration != null) {
            i = configuration.getInt("yarn.nodemanager.delete.thread-count", 4);
        }
        this.deletionService = new ScheduledThreadPoolExecutor(i);
        this.deletionService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.deletionService.setKeepAliveTime(60L, TimeUnit.SECONDS);
        ShutdownHookManager.get().addShutdownHook(new LocalVolumeAuxServiceShutDownHook(this), 40);
    }

    protected void serviceStart() throws Exception {
        initVolume();
        this.volumeChecker.scheduleAtFixedRate(new VolumeHealthCheckTask(), this.volumeCheckInterval, this.volumeCheckInterval, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initVolume() throws IOException {
        String[] strArr = {MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_PATH, LOCALHOSTNAME, getMapRedLocalVolumeMountPath(), getNodeManagerDirPath(), YARN_ARG};
        HashMap hashMap = new HashMap();
        hashMap.put("MAPR_MAPREDUCE_MODE", YARN_ARG);
        Shell.ShellCommandExecutor shellCommandExecutor = new Shell.ShellCommandExecutor(strArr, null, hashMap);
        LOG.info("Checking for local volume. If volume is not present command will create and mount it. Command invoked is : " + shellCommandExecutor.toString());
        try {
            shellCommandExecutor.execute();
            LOG.info("Sucessfully created volume and mounted at " + strArr[2]);
            try {
                initMapReduceDirs();
            } catch (IOException e) {
                LOG.error("Could not initialize directories for mapreduce", e);
                throw e;
            }
        } catch (IOException e2) {
            if (shellCommandExecutor.getExitCode() != 0) {
                LOG.error("Failed to create and mount local mapreduce volume at " + strArr[2] + ". Please see logs at " + MAPR_INSTALL_DIR + LOCAL_VOLUME_CREATE_SCRIPT_LOGFILE_PATH);
                LOG.error("Command ran " + shellCommandExecutor.toString());
                LOG.error("Command output " + shellCommandExecutor.getOutput());
            }
            throw e2;
        }
    }

    protected void serviceStop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Shutting down Volume Checker service");
        }
        this.volumeChecker.shutdown();
        if (this.deletionService != null) {
            LOG.info("Cleanup service shutdown");
            this.deletionService.shutdown();
            boolean z = false;
            try {
                z = this.deletionService.awaitTermination(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (z) {
                return;
            }
            this.deletionService.shutdownNow();
        }
    }

    public void initializeApplication(ApplicationInitializationContext applicationInitializationContext) {
        String user = applicationInitializationContext.getUser();
        ApplicationId applicationId = applicationInitializationContext.getApplicationId();
        String jobID = new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId()).toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug("In initializeApplication. Application Id: " + applicationId.getId() + ", Job Id: " + jobID);
        }
        if (this.jobMetaData.get(jobID) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Fids for job: " + jobID + " already created. skipping initializeApplication.");
                return;
            }
            return;
        }
        LOG.info("initializeApplication for job: " + jobID + " and user: " + user);
        String str = null;
        String[] groupNames = UserGroupInformation.createRemoteUser(user).getGroupNames();
        if (groupNames != null && groupNames.length > 0) {
            str = groupNames[0];
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("User: " + user + " Group: " + str);
        }
        try {
            String[] strArr = new String[this.fidRoots.length];
            strArr[FidId.ROOT.ordinal()] = this.fidRoots[FidId.ROOT.ordinal()];
            MapRDirectShuffleMetaData mapRDirectShuffleMetaData = new MapRDirectShuffleMetaData();
            PathId pathId = this.metaData.getMapReduceDirsPathIds().get(this.rootDirNames[FidId.ROOT.ordinal()]);
            mapRDirectShuffleMetaData.putDirPathId(this.rootDirNames[FidId.ROOT.ordinal()], pathId);
            mapRDirectShuffleMetaData.setNodeManageHostName(LOCALHOSTNAME);
            for (int ordinal = FidId.ROOT.ordinal() + 1; ordinal < this.fidRoots.length; ordinal++) {
                strArr[ordinal] = this.maprfs.mkdirsFid(this.fidRoots[ordinal], jobID);
                MapRPathId mapRPathId = new MapRPathId();
                mapRPathId.setFid(strArr[ordinal]);
                mapRPathId.setIps(pathId.getIPs());
                mapRDirectShuffleMetaData.putDirPathId(this.rootDirNames[ordinal], mapRPathId);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(FidId.values()[ordinal].name() + " fid for " + jobID + ": " + strArr[ordinal]);
                }
                if (user != null) {
                    this.maprfs.setOwnerFid(strArr[ordinal], user, str);
                }
            }
            this.jobMetaData.put(jobID, mapRDirectShuffleMetaData);
        } catch (IOException e) {
            LOG.error("Error during initializeApplication. App Id: " + applicationId.getId() + ", Job Id: " + jobID, e);
        }
    }

    public void stopApplication(ApplicationTerminationContext applicationTerminationContext) {
        ApplicationId applicationId = applicationTerminationContext.getApplicationId();
        final String jobID = new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId()).toString();
        if (this.isShuttingDown) {
            LOG.info("NodeManager is shutting down but " + applicationId.toString() + "/" + jobID + " might still be running. Not cleaning up the " + jobID + " directory in the local volume.");
        } else {
            this.deletionService.schedule(new Runnable() { // from class: com.mapr.hadoop.mapred.LocalVolumeAuxService.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            for (int ordinal = FidId.ROOT.ordinal() + 1; ordinal < LocalVolumeAuxService.this.fidRoots.length; ordinal++) {
                                if (!LocalVolumeAuxService.this.maprfs.deleteFid(LocalVolumeAuxService.this.fidRoots[ordinal], jobID)) {
                                    LocalVolumeAuxService.LOG.warn(jobID + " could not be deleted from " + FidId.values()[ordinal] + ". Parent Fid: " + LocalVolumeAuxService.this.fidRoots[ordinal]);
                                } else if (LocalVolumeAuxService.LOG.isDebugEnabled()) {
                                    LocalVolumeAuxService.LOG.debug("Deleted " + jobID + " from " + FidId.values()[ordinal]);
                                }
                            }
                            LocalVolumeAuxService.this.jobMetaData.remove(jobID);
                        } catch (Throwable th) {
                            LocalVolumeAuxService.LOG.error("Error during removing localvolume data for Job Id: " + jobID, th);
                            LocalVolumeAuxService.this.jobMetaData.remove(jobID);
                        }
                    } catch (Throwable th2) {
                        LocalVolumeAuxService.this.jobMetaData.remove(jobID);
                        throw th2;
                    }
                }
            }, 0L, TimeUnit.SECONDS);
        }
    }

    public ByteBuffer getMetaData() {
        try {
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            this.metaData.write(dataOutputBuffer);
            return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
        } catch (IOException e) {
            LOG.error("Encountered error while returning metadata.", e);
            return null;
        }
    }

    public ByteBuffer getMetaData(ContainerInitializationContext containerInitializationContext) {
        if (containerInitializationContext == null || containerInitializationContext.getContainerId() == null || containerInitializationContext.getContainerId().getApplicationAttemptId() == null || containerInitializationContext.getContainerId().getApplicationAttemptId().getApplicationId() == null) {
            LOG.warn("context is null in getMetaData for context. Returning service metadata");
            return getMetaData();
        }
        ApplicationId applicationId = containerInitializationContext.getContainerId().getApplicationAttemptId().getApplicationId();
        MapRDirectShuffleMetaData mapRDirectShuffleMetaData = this.jobMetaData.get(new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId()).toString());
        if (mapRDirectShuffleMetaData == null) {
            LOG.error("Can not find metadata for a job. Returning service metadata");
            return getMetaData();
        }
        try {
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            mapRDirectShuffleMetaData.write(dataOutputBuffer);
            return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
        } catch (IOException e) {
            LOG.error("Encountered error while returning metadata.", e);
            return null;
        }
    }

    private void initMapReduceDirs() throws IOException {
        this.fidRoots = new String[5];
        String mkdirsFid = this.maprfs.mkdirsFid(new Path(getNodeManagerDirPath()));
        this.fidRoots[FidId.ROOT.ordinal()] = mkdirsFid;
        MapRPathId mapRPathId = new MapRPathId();
        FSDataOutputStream createFid = this.maprfs.createFid(mkdirsFid, "fidservers");
        mapRPathId.setFid(mkdirsFid);
        mapRPathId.setIps(createFid.getFidServers());
        createFid.close();
        String str = this.conf.get(MAPR_LOCALOUTPUT_DIR_PARAM, "output");
        String str2 = str + MAPR_UNCOMPRESSED_SUFFIX;
        String str3 = this.conf.get(MAPR_LOCALSPILL_DIR_PARAM, MAPR_LOCALSPILL_DIR_DEFAULT);
        String str4 = str3 + MAPR_UNCOMPRESSED_SUFFIX;
        this.metaData.putDirPathId(".", mapRPathId);
        LOG.info("root fid : " + mapRPathId.getFid());
        PathId createDirAndGetPathId = createDirAndGetPathId(mapRPathId, str);
        this.metaData.putDirPathId(str, createDirAndGetPathId);
        this.fidRoots[FidId.OUTPUT.ordinal()] = createDirAndGetPathId.getFid();
        PathId createDirAndGetPathId2 = createDirAndGetPathId(mapRPathId, str2);
        this.metaData.putDirPathId(str2, createDirAndGetPathId2);
        this.fidRoots[FidId.OUTPUT_U.ordinal()] = createDirAndGetPathId2.getFid();
        PathId createDirAndGetPathId3 = createDirAndGetPathId(mapRPathId, str3);
        this.metaData.putDirPathId(str3, createDirAndGetPathId3);
        this.fidRoots[FidId.SPILL.ordinal()] = createDirAndGetPathId3.getFid();
        PathId createDirAndGetPathId4 = createDirAndGetPathId(mapRPathId, str4);
        this.metaData.putDirPathId(str4, createDirAndGetPathId4);
        this.fidRoots[FidId.SPILL_U.ordinal()] = createDirAndGetPathId4.getFid();
    }

    private PathId createDirAndGetPathId(PathId pathId, String str) throws IOException {
        String mkdirsFid = this.maprfs.mkdirsFid(pathId.getFid(), str);
        MapRPathId mapRPathId = new MapRPathId();
        mapRPathId.setFid(mkdirsFid);
        mapRPathId.setIps(pathId.getIPs());
        LOG.info(str + " fid : " + mkdirsFid);
        return mapRPathId;
    }

    @VisibleForTesting
    String getMapRedLocalVolumeMountPath() {
        return this.conf.get("mapr.mapred.localvolume.mount.path", "/var/mapr/local/" + LOCALHOSTNAME + "/mapred");
    }

    @VisibleForTesting
    String getNodeManagerDirPath() {
        return this.conf.get("mapr.mapred.localvolume.root.dir.path", getMapRedLocalVolumeMountPath() + "/nodeManager");
    }
}
