package org.apache.sentry.hdfs;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
import org.apache.sentry.provider.db.service.persistent.HAContext;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sentry/hdfs/PluginCacheSyncUtil.class */
public class PluginCacheSyncUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(PluginCacheSyncUtil.class);
    public static final long CACHE_GC_SIZE_THRESHOLD_HWM = 100;
    public static final long CACHE_GC_SIZE_THRESHOLD_LWM = 50;
    public static final long CACHE_GC_SIZE_MAX_CLEANUP = 1000;
    public static final long ZK_COUNTER_INIT_VALUE = 4;
    public static final long GC_COUNTER_INIT_VALUE = 5;
    private final String zkPath;
    private final HAContext haContext;
    private final PathChildrenCache cache;
    private InterProcessSemaphoreMutex updatorLock;
    private InterProcessSemaphoreMutex gcLock;
    private int lockTimeout;
    private DistributedAtomicLong updateCounter;
    private DistributedAtomicLong gcCounter;
    private final ScheduledExecutorService gcSchedulerForZk = Executors.newScheduledThreadPool(1);

    public PluginCacheSyncUtil(String str, final Configuration configuration, PathChildrenCacheListener pathChildrenCacheListener) throws SentryPolicyStorePlugin.SentryPluginException {
        this.zkPath = str;
        try {
            this.haContext = HAContext.getHAContext(configuration);
            this.haContext.startCuratorFramework();
            this.cache = new PathChildrenCache(this.haContext.getCuratorFramework(), str + "/cache", true);
            this.cache.getListenable().addListener(pathChildrenCacheListener);
            try {
                this.cache.start();
                this.updatorLock = new InterProcessSemaphoreMutex(this.haContext.getCuratorFramework(), str + "/lock");
                this.lockTimeout = configuration.getInt("sentry.hdfs.init.update.retry.delay.ms", 10000);
                this.gcLock = new InterProcessSemaphoreMutex(this.haContext.getCuratorFramework(), str + "/gclock");
                this.updateCounter = new DistributedAtomicLong(this.haContext.getCuratorFramework(), str + "/counter", this.haContext.getRetryPolicy());
                try {
                    this.updateCounter.initialize(4L);
                } catch (Exception e) {
                    LOGGER.error("Error initializing  counter for zpath " + str, e);
                }
                this.gcCounter = new DistributedAtomicLong(this.haContext.getCuratorFramework(), str + "/gccounter", this.haContext.getRetryPolicy());
                try {
                    this.gcCounter.initialize(5L);
                } catch (Exception e2) {
                    LOGGER.error("Error initializing  counter for zpath " + str, e2);
                }
                this.gcSchedulerForZk.scheduleAtFixedRate(new Runnable() { // from class: org.apache.sentry.hdfs.PluginCacheSyncUtil.1
                    @Override // java.lang.Runnable
                    public void run() {
                        PluginCacheSyncUtil.this.gcPluginCache(configuration);
                    }
                }, 10L, 10L, TimeUnit.MINUTES);
            } catch (Exception e3) {
                throw new SentryPolicyStorePlugin.SentryPluginException("Error creating ZK PathCache ", e3);
            }
        } catch (Exception e4) {
            throw new SentryPolicyStorePlugin.SentryPluginException("Error creating HA context ", e4);
        }
    }

    public void handleCacheUpdate(Updateable.Update update) throws SentryPolicyStorePlugin.SentryPluginException {
        Timer.Context time = SentryHdfsMetricsUtil.getCacheSyncToZKTimer.time();
        try {
            if (!this.updatorLock.acquire(this.lockTimeout, TimeUnit.MILLISECONDS)) {
                throw new SentryPolicyStorePlugin.SentryPluginException("Failed to get ZK lock for update cache syncup");
            }
            try {
                try {
                    if (!update.hasFullImage()) {
                        update.setSeqNum(((Long) this.updateCounter.increment().postValue()).longValue());
                    } else if (((Long) this.updateCounter.get().preValue()).longValue() < update.getSeqNum()) {
                        this.updateCounter.add(Long.valueOf(update.getSeqNum() - ((Long) this.updateCounter.get().preValue()).longValue()));
                    }
                    try {
                        this.haContext.getCuratorFramework().create().creatingParentsIfNeeded().forPath(ZKPaths.makePath(this.zkPath + "/cache", String.valueOf(update.getSeqNum())), update.serialize());
                        try {
                            this.updatorLock.release();
                            time.stop();
                            if (0 != 0) {
                                SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
                            }
                        } catch (Exception e) {
                            time.stop();
                            SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
                            throw new SentryPolicyStorePlugin.SentryPluginException("Error releasing ZK lock for update cache syncup" + e, e);
                        }
                    } catch (Exception e2) {
                        throw new SentryPolicyStorePlugin.SentryPluginException("error posting update to ZK ", e2);
                    }
                } catch (Exception e3) {
                    throw new SentryPolicyStorePlugin.SentryPluginException("Error setting ZK counter for update cache syncup" + e3, e3);
                }
            } catch (Throwable th) {
                try {
                    this.updatorLock.release();
                    time.stop();
                    if (0 != 0) {
                        SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
                    }
                    throw th;
                } catch (Exception e4) {
                    time.stop();
                    SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
                    throw new SentryPolicyStorePlugin.SentryPluginException("Error releasing ZK lock for update cache syncup" + e4, e4);
                }
            }
        } catch (Exception e5) {
            time.stop();
            SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
            throw new SentryPolicyStorePlugin.SentryPluginException("Error getting ZK lock for update cache syncup" + e5, e5);
        }
    }

    public static void setUpdateFromChildEvent(PathChildrenCacheEvent pathChildrenCacheEvent, Updateable.Update update) throws IOException {
        update.deserialize(pathChildrenCacheEvent.getData().getData());
        update.setSeqNum(Integer.valueOf(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath())).intValue());
    }

    public void close() throws IOException {
        this.cache.close();
    }

    public long getUpdateCounter() throws Exception {
        return ((Long) this.updateCounter.get().preValue()).longValue();
    }

    @VisibleForTesting
    public void gcPluginCache(Configuration configuration) {
        try {
            try {
                if (!this.gcLock.acquire(500L, TimeUnit.MILLISECONDS)) {
                    if (this.gcLock.isAcquiredInThisProcess()) {
                        try {
                            this.gcLock.release();
                            return;
                        } catch (Exception e) {
                            LOGGER.warn("Error releasing gc lock", e);
                            return;
                        }
                    }
                    return;
                }
                Long l = (Long) this.updateCounter.get().preValue();
                Long l2 = (Long) this.gcCounter.get().preValue();
                if (l.longValue() - l2.longValue() > 100) {
                    Long valueOf = Long.valueOf(Math.min((l.longValue() - l2.longValue()) - 50, 1000L));
                    for (Long l3 = l2; l3.longValue() < l2.longValue() + valueOf.longValue(); l3 = Long.valueOf(l3.longValue() + 1)) {
                        String makePath = ZKPaths.makePath(this.zkPath + "/cache", Long.toString(l3.longValue()));
                        try {
                            this.haContext.getCuratorFramework().delete().forPath(makePath);
                            this.gcCounter.increment();
                            LOGGER.debug("Deleted znode " + makePath);
                        } catch (Exception e2) {
                            LOGGER.info("Error cleaning up node " + makePath, e2);
                        } catch (KeeperException.NoNodeException e3) {
                            this.gcCounter.increment();
                        }
                    }
                }
                if (this.gcLock.isAcquiredInThisProcess()) {
                    try {
                        this.gcLock.release();
                    } catch (Exception e4) {
                        LOGGER.warn("Error releasing gc lock", e4);
                    }
                }
            } catch (Exception e5) {
                LOGGER.warn("Error cleaning the cache", e5);
                if (this.gcLock.isAcquiredInThisProcess()) {
                    try {
                        this.gcLock.release();
                    } catch (Exception e6) {
                        LOGGER.warn("Error releasing gc lock", e6);
                    }
                }
            }
        } catch (Throwable th) {
            if (this.gcLock.isAcquiredInThisProcess()) {
                try {
                    this.gcLock.release();
                } catch (Exception e7) {
                    LOGGER.warn("Error releasing gc lock", e7);
                }
            }
            throw th;
        }
    }
}
