package org.apache.hadoop.yarn.server.sharedcachemanager.store;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.class */
public class InMemorySCMStore extends SCMStore {
    private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
    private final Map<String, SharedCacheResource> cachedResources;
    private Collection<ApplicationId> initialApps;
    private final Object initialAppsLock;
    private long startTime;
    private int stalenessMinutes;
    private ScheduledExecutorService scheduler;
    private int initialDelayMin;
    private int checkPeriodMin;

    @InterfaceAudience.Private
    @InterfaceStability.Evolving
    /* loaded from: input_file:org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore$AppCheckTask.class */
    class AppCheckTask implements Runnable {
        private final AppChecker taskAppChecker;

        public AppCheckTask(AppChecker appChecker) {
            this.taskAppChecker = appChecker;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                InMemorySCMStore.LOG.info("Checking the initial app list for finished applications.");
                synchronized (InMemorySCMStore.this.initialAppsLock) {
                    if (!InMemorySCMStore.this.initialApps.isEmpty()) {
                        InMemorySCMStore.LOG.info("Looking into " + InMemorySCMStore.this.initialApps.size() + " apps to see if they are still active");
                        Iterator it = InMemorySCMStore.this.initialApps.iterator();
                        while (it.hasNext()) {
                            try {
                                if (!this.taskAppChecker.isApplicationActive((ApplicationId) it.next())) {
                                    it.remove();
                                }
                            } catch (YarnException e) {
                                InMemorySCMStore.LOG.warn("Exception while checking the app status; will leave the entry in the list", e);
                            }
                        }
                    }
                    InMemorySCMStore.LOG.info("There are now " + InMemorySCMStore.this.initialApps.size() + " entries in the list");
                }
            } catch (Throwable th) {
                InMemorySCMStore.LOG.error("Unexpected exception thrown during in-memory store app check task. Rescheduling task.", th);
            }
        }
    }

    public InMemorySCMStore() {
        super(InMemorySCMStore.class.getName());
        this.cachedResources = new ConcurrentHashMap();
        this.initialApps = new ArrayList();
        this.initialAppsLock = new Object();
    }

    @VisibleForTesting
    public InMemorySCMStore(AppChecker appChecker) {
        super(InMemorySCMStore.class.getName(), appChecker);
        this.cachedResources = new ConcurrentHashMap();
        this.initialApps = new ArrayList();
        this.initialAppsLock = new Object();
    }

    private String intern(String str) {
        return StringInterner.weakIntern(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public void serviceInit(Configuration configuration) throws Exception {
        this.startTime = System.currentTimeMillis();
        this.initialDelayMin = getInitialDelay(configuration);
        this.checkPeriodMin = getCheckPeriod(configuration);
        this.stalenessMinutes = getStalenessPeriod(configuration);
        bootstrap(configuration);
        this.scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore").build());
        super.serviceInit(configuration);
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
        LOG.info("Getting the active app list to initialize the in-memory scm store");
        synchronized (this.initialAppsLock) {
            this.initialApps = this.appChecker.getActiveApplications();
        }
        LOG.info(this.initialApps.size() + " apps recorded as active at this time");
        this.scheduler.scheduleAtFixedRate(new AppCheckTask(this.appChecker), this.initialDelayMin, this.checkPeriodMin, TimeUnit.MINUTES);
        LOG.info("Scheduled the in-memory scm store app check task to run every " + this.checkPeriodMin + " minutes.");
    }

    protected void serviceStop() throws Exception {
        LOG.info("Stopping the " + InMemorySCMStore.class.getSimpleName() + " service.");
        if (this.scheduler != null) {
            LOG.info("Shutting down the background thread.");
            this.scheduler.shutdownNow();
            try {
                if (!this.scheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
                    LOG.warn("Gave up waiting for the app check task to shutdown.");
                }
            } catch (InterruptedException e) {
                LOG.warn("The InMemorySCMStore was interrupted while shutting down the app check task.", e);
            }
            LOG.info("The background thread stopped.");
        }
        super.serviceStop();
    }

    private void bootstrap(Configuration configuration) throws IOException {
        Map<String, String> initialCachedResources = getInitialCachedResources(FileSystem.get(configuration), configuration);
        LOG.info("Bootstrapping from " + initialCachedResources.size() + " cache resources located in the file system");
        Iterator<Map.Entry<String, String>> it = initialCachedResources.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> next = it.next();
            this.cachedResources.put(intern(next.getKey()), new SharedCacheResource(next.getValue()));
            it.remove();
        }
        LOG.info("Bootstrapping complete");
    }

    @VisibleForTesting
    Map<String, String> getInitialCachedResources(FileSystem fileSystem, Configuration configuration) throws IOException {
        Path parent;
        String str = configuration.get("yarn.sharedcache.root-dir", "/sharedcache");
        Path path = new Path(str);
        if (!fileSystem.exists(path)) {
            String str2 = "The shared cache root directory " + str + " was not found";
            LOG.error(str2);
            throw new IOException(str2);
        }
        String cacheEntryGlobPattern = SharedCacheUtil.getCacheEntryGlobPattern(SharedCacheUtil.getCacheDepth(configuration) + 1);
        LOG.info("Querying for all individual cached resource files");
        FileStatus[] globStatus = fileSystem.globStatus(new Path(path, cacheEntryGlobPattern));
        LOG.info("Found " + (globStatus == null ? 0 : globStatus.length) + " files: processing for one resource per key");
        HashMap hashMap = new HashMap();
        if (globStatus != null) {
            for (FileStatus fileStatus : globStatus) {
                Path path2 = fileStatus.getPath();
                String name = path2.getName();
                if (fileStatus.isFile() && (parent = path2.getParent()) != null) {
                    String name2 = parent.getName();
                    if (hashMap.containsKey(name2)) {
                        LOG.warn("Key " + name2 + " is already mapped to file " + ((String) hashMap.get(name2)) + "; file " + name + " will not be added");
                    } else {
                        hashMap.put(name2, name);
                    }
                }
            }
        }
        LOG.info("A total of " + hashMap.size() + " files are now mapped");
        return hashMap;
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public String addResource(String str, String str2) {
        String fileName;
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource == null) {
                sharedCacheResource = new SharedCacheResource(str2);
                this.cachedResources.put(intern, sharedCacheResource);
            }
            fileName = sharedCacheResource.getFileName();
        }
        return fileName;
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public String addResourceReference(String str, SharedCacheResourceReference sharedCacheResourceReference) {
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource == null) {
                return null;
            }
            sharedCacheResource.addReference(sharedCacheResourceReference);
            sharedCacheResource.updateAccessTime();
            return sharedCacheResource.getFileName();
        }
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public Collection<SharedCacheResourceReference> getResourceReferences(String str) {
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource == null) {
                return Collections.emptySet();
            }
            return Collections.unmodifiableSet(new HashSet(sharedCacheResource.getResourceReferences()));
        }
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public boolean removeResourceReference(String str, SharedCacheResourceReference sharedCacheResourceReference, boolean z) {
        boolean z2;
        String intern = intern(str);
        synchronized (intern) {
            boolean z3 = false;
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource != null) {
                z3 = sharedCacheResource.getResourceReferences().remove(sharedCacheResourceReference);
                if (z) {
                    sharedCacheResource.updateAccessTime();
                }
            }
            z2 = z3;
        }
        return z2;
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public void removeResourceReferences(String str, Collection<SharedCacheResourceReference> collection, boolean z) {
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource != null) {
                sharedCacheResource.getResourceReferences().removeAll(collection);
                if (z) {
                    sharedCacheResource.updateAccessTime();
                }
            }
        }
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public void cleanResourceReferences(String str) throws YarnException {
        synchronized (intern(str)) {
            super.cleanResourceReferences(str);
        }
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public boolean removeResource(String str) {
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            if (sharedCacheResource == null) {
                return true;
            }
            if (!sharedCacheResource.getResourceReferences().isEmpty()) {
                return false;
            }
            this.cachedResources.remove(intern);
            return true;
        }
    }

    @VisibleForTesting
    long getAccessTime(String str) {
        long accessTime;
        String intern = intern(str);
        synchronized (intern) {
            SharedCacheResource sharedCacheResource = this.cachedResources.get(intern);
            accessTime = sharedCacheResource == null ? -1L : sharedCacheResource.getAccessTime();
        }
        return accessTime;
    }

    @Override // org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore
    public boolean isResourceEvictable(String str, FileStatus fileStatus) {
        synchronized (this.initialAppsLock) {
            if (this.initialApps.size() > 0) {
                return false;
            }
            long currentTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
            long accessTime = getAccessTime(str);
            if (accessTime != -1) {
                return accessTime < currentTimeMillis;
            }
            long modificationTime = fileStatus.getModificationTime();
            return ((modificationTime > this.startTime ? 1 : (modificationTime == this.startTime ? 0 : -1)) < 0 ? this.startTime : modificationTime) < currentTimeMillis;
        }
    }

    private static int getStalenessPeriod(Configuration configuration) {
        int i = configuration.getInt("yarn.sharedcache.store.in-memory.staleness-period-mins", 10080);
        if (i <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive staleness value: " + i + ". The staleness value must be greater than zero.");
        }
        return i;
    }

    private static int getInitialDelay(Configuration configuration) {
        int i = configuration.getInt("yarn.sharedcache.store.in-memory.initial-delay-mins", 10);
        if (i <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive initial delay value: " + i + ". The initial delay value must be greater than zero.");
        }
        return i;
    }

    private static int getCheckPeriod(Configuration configuration) {
        int i = configuration.getInt("yarn.sharedcache.store.in-memory.check-period-mins", 720);
        if (i <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive check period value: " + i + ". The check period value must be greater than zero.");
        }
        return i;
    }
}
