package org.apache.oozie.service;

import com.google.common.annotations.VisibleForTesting;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.dependency.hcat.HCatDependencyCache;
import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;

/* loaded from: input_file:WEB-INF/lib/oozie-core-5.1.0.300-mapr-630.jar:org/apache/oozie/service/PartitionDependencyManagerService.class */
public class PartitionDependencyManagerService implements Service {
    public static final String CONF_PREFIX = "oozie.service.PartitionDependencyManagerService.";
    public static final String CACHE_MANAGER_IMPL = "oozie.service.PartitionDependencyManagerService.cache.manager.impl";
    public static final String CACHE_PURGE_INTERVAL = "oozie.service.PartitionDependencyManagerService.cache.purge.interval";
    public static final String CACHE_PURGE_TTL = "oozie.service.PartitionDependencyManagerService.cache.purge.ttl";
    private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
    private HCatDependencyCache dependencyCache;
    private ConcurrentMap<String, Long> registeredCoordActionMap;
    private boolean purgeEnabled = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/oozie-core-5.1.0.300-mapr-630.jar:org/apache/oozie/service/PartitionDependencyManagerService$CachePurgeWorker.class */
    public class CachePurgeWorker implements Runnable {
        HCatDependencyCache cache;

        public CachePurgeWorker(HCatDependencyCache hCatDependencyCache) {
            this.cache = hCatDependencyCache;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            try {
                purgeMissingDependency(Services.get().getConf().getInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 1800));
            } catch (Throwable th) {
                XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", th);
            }
        }

        private void purgeMissingDependency(int i) {
            long time = new Date().getTime();
            HashSet hashSet = new HashSet();
            Iterator it = PartitionDependencyManagerService.this.registeredCoordActionMap.keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (((Long) PartitionDependencyManagerService.this.registeredCoordActionMap.get(str)).longValue() < time - (i * 1000)) {
                    CoordinatorActionBean coordinatorActionBean = null;
                    try {
                        coordinatorActionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, str);
                    } catch (JPAExecutorException e) {
                        if (e.getErrorCode() == ErrorCode.E0605) {
                            PartitionDependencyManagerService.LOG.info(MessageFormat.format("Coord action {0} is not in database, deleting it from cache", str));
                            hashSet.add(str);
                            it.remove();
                        } else {
                            PartitionDependencyManagerService.LOG.warn("Error in checking coord action:" + str + "to purge, skipping", e);
                        }
                    }
                    if (coordinatorActionBean != null && !coordinatorActionBean.getStatus().equals(CoordinatorAction.Status.WAITING)) {
                        hashSet.add(str);
                        it.remove();
                    }
                }
            }
            PartitionDependencyManagerService.this.dependencyCache.removeNonWaitingCoordActions(hashSet);
        }
    }

    @Override // org.apache.oozie.service.Service
    public void init(Services services) throws ServiceException {
        init(services.getConf());
    }

    private void init(Configuration configuration) throws ServiceException {
        Class<?> cls = configuration.getClass(CACHE_MANAGER_IMPL, null);
        this.dependencyCache = cls == null ? new SimpleHCatDependencyCache() : (HCatDependencyCache) ReflectionUtils.newInstance(cls, null);
        this.dependencyCache.init(configuration);
        LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", this.dependencyCache.getClass().getName());
        this.purgeEnabled = ((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isHighlyAvailableMode();
        if (this.purgeEnabled) {
            ((SchedulerService) Services.get().get(SchedulerService.class)).schedule(new CachePurgeWorker(this.dependencyCache), 10L, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), SchedulerService.Unit.SEC);
            this.registeredCoordActionMap = new ConcurrentHashMap();
        }
    }

    @Override // org.apache.oozie.service.Service
    public void destroy() {
        this.dependencyCache.destroy();
    }

    @Override // org.apache.oozie.service.Service
    public Class<? extends Service> getInterface() {
        return PartitionDependencyManagerService.class;
    }

    public void addMissingDependency(HCatURI hCatURI, String str) {
        if (this.purgeEnabled) {
            this.registeredCoordActionMap.put(str, Long.valueOf(new Date().getTime()));
        }
        this.dependencyCache.addMissingDependency(hCatURI, str);
    }

    public boolean removeMissingDependency(HCatURI hCatURI, String str) {
        return this.dependencyCache.removeMissingDependency(hCatURI, str);
    }

    public Collection<String> getWaitingActions(HCatURI hCatURI) {
        return this.dependencyCache.getWaitingActions(hCatURI);
    }

    public void partitionAvailable(String str, String str2, String str3, Map<String, String> map) {
        Collection<String> markDependencyAvailable = this.dependencyCache.markDependencyAvailable(str, str2, str3, map);
        if (markDependencyAvailable != null) {
            for (String str4 : markDependencyAvailable) {
                if (!((CallableQueueService) Services.get().get(CallableQueueService.class)).queue(new CoordActionUpdatePushMissingDependency(str4), 100L)) {
                    XLog.getLog(getClass()).warn("Unable to queue the callable commands for PartitionDependencyManagerService for actionID " + str4 + ".Most possibly command queue is full. Queue size is :" + ((CallableQueueService) Services.get().get(CallableQueueService.class)).queueSize());
                }
            }
        }
    }

    public Collection<String> getAvailableDependencyURIs(String str) {
        return this.dependencyCache.getAvailableDependencyURIs(str);
    }

    public boolean removeAvailableDependencyURIs(String str, Collection<String> collection) {
        return this.dependencyCache.removeAvailableDependencyURIs(str, collection);
    }

    public void removeCoordActionWithDependenciesAvailable(String str) {
        if (this.purgeEnabled) {
            this.registeredCoordActionMap.remove(str);
        }
        this.dependencyCache.removeCoordActionWithDependenciesAvailable(str);
    }

    @VisibleForTesting
    public void runCachePurgeWorker() {
        new CachePurgeWorker(this.dependencyCache).run();
    }
}
