/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.security.token.delegation;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.util.ZookeeperClient;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> {
    public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
    public static final String ZK_DTSM_ZK_NUM_RETRIES = "zk-dt-secret-manager.zkNumRetries";
    public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = "zk-dt-secret-manager.zkSessionTimeout";
    public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = "zk-dt-secret-manager.zkConnectionTimeout";
    public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = "zk-dt-secret-manager.zkShutdownTimeout";
    public static final String ZK_DTSM_ZNODE_WORKING_PATH = "zk-dt-secret-manager.znodeWorkingPath";
    public static final String ZK_DTSM_ZK_AUTH_TYPE = "zk-dt-secret-manager.zkAuthType";
    public static final String ZK_DTSM_ZK_CONNECTION_STRING = "zk-dt-secret-manager.zkConnectionString";
    public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = "zk-dt-secret-manager.kerberos.keytab";
    public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = "zk-dt-secret-manager.kerberos.principal";
    public static final String ZK_DTSM_ZK_KERBEROS_SERVER_PRINCIPAL = "zk-dt-secret-manager.kerberos.server.principal";
    public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = "zk-dt-secret-manager.token.seqnum.batch.size";
    public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = "zk-dt-secret-manager.token.watcher.enabled";
    public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;
    public static final String ZK_DTSM_ZK_SSL_ENABLED = "zk-dt-secret-manager.ssl.enabled";
    public static final String ZK_DTSM_ZK_SSL_KEYSTORE_LOCATION = "zk-dt-secret-manager.ssl.keystore.location";
    public static final String ZK_DTSM_ZK_SSL_KEYSTORE_PASSWORD = "zk-dt-secret-manager.ssl.keystore.password";
    public static final String ZK_DTSM_ZK_SSL_TRUSTSTORE_LOCATION = "zk-dt-secret-manager.ssl.truststore.location";
    public static final String ZK_DTSM_ZK_SSL_TRUSTSTORE_PASSWORD = "zk-dt-secret-manager.ssl.truststore.password";
    public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
    public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
    public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
    public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
    public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
    public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 100;
    private static final Logger LOG = LoggerFactory.getLogger(ZKDelegationTokenSecretManager.class);
    private static final String JAAS_LOGIN_ENTRY_NAME = "ZKDelegationTokenSecretManagerClient";
    private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
    private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
    private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
    protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
    private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
    private static final String DELEGATION_KEY_PREFIX = "DK_";
    private static final String DELEGATION_TOKEN_PREFIX = "DT_";
    private static final ThreadLocal<CuratorFramework> CURATOR_TL = new ThreadLocal();
    private final boolean isExternalClient;
    protected final CuratorFramework zkClient;
    private SharedCount delTokSeqCounter;
    private SharedCount keyIdSeqCounter;
    private CuratorCacheBridge keyCache;
    private CuratorCacheBridge tokenCache;
    private final int seqNumBatchSize;
    private int currentSeqNum;
    private int currentMaxSeqNum;
    private final ReentrantLock currentSeqNumLock;
    private final boolean isTokenWatcherEnabled;

    public static void setCurator(CuratorFramework curator) {
        CURATOR_TL.set(curator);
    }

    @VisibleForTesting
    protected static CuratorFramework getCurator() {
        return CURATOR_TL.get();
    }

    public ZKDelegationTokenSecretManager(Configuration conf) {
        super(conf.getLong("delegation-token.update-interval.sec", 86400L) * 1000L, conf.getLong("delegation-token.max-lifetime.sec", 604800L) * 1000L, conf.getLong("delegation-token.renew-interval.sec", 86400L) * 1000L, conf.getLong("delegation-token.removal-scan-interval.sec", 3600L) * 1000L);
        this.seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 100);
        this.isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, true);
        this.currentSeqNumLock = new ReentrantLock(true);
        String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT);
        String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE;
        if (CURATOR_TL.get() != null) {
            this.zkClient = CURATOR_TL.get().usingNamespace(nameSpace);
            this.isExternalClient = true;
        } else {
            this.zkClient = ZKDelegationTokenSecretManager.createCuratorClient(conf, nameSpace);
            this.isExternalClient = false;
        }
    }

    @VisibleForTesting
    static CuratorFramework createCuratorClient(Configuration conf, String namespace) {
        try {
            String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
            String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
            String keytab = conf.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
            String principal = SecurityUtil.getServerPrincipal(conf.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim(), "");
            int sessionTimeout = conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT, 10000);
            int connectionTimeout = conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT, 10000);
            int retryCount = conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, 3);
            RetryNTimes retryPolicy = new RetryNTimes(retryCount, retryCount == 0 ? 0 : sessionTimeout / retryCount);
            boolean isSSLEnabled = conf.getBoolean("hadoop.zk.ssl.enabled", conf.getBoolean(ZK_DTSM_ZK_SSL_ENABLED, false));
            String keystoreLocation = conf.get(ZK_DTSM_ZK_SSL_KEYSTORE_LOCATION, conf.get("hadoop.zk.ssl.keystore.location", ""));
            String keystorePassword = conf.get(ZK_DTSM_ZK_SSL_KEYSTORE_PASSWORD, conf.get("hadoop.zk.ssl.keystore.password", ""));
            String truststoreLocation = conf.get(ZK_DTSM_ZK_SSL_TRUSTSTORE_LOCATION, conf.get("hadoop.zk.ssl.truststore.location", ""));
            String truststorePassword = conf.get(ZK_DTSM_ZK_SSL_TRUSTSTORE_PASSWORD, conf.get("hadoop.zk.ssl.truststore.password", ""));
            ZKCuratorManager.HadoopZookeeperFactory zkFactory = new ZKCuratorManager.HadoopZookeeperFactory(conf.get(ZK_DTSM_ZK_KERBEROS_SERVER_PRINCIPAL), conf.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL), conf.get(ZK_DTSM_ZK_KERBEROS_KEYTAB), isSSLEnabled, new SecurityUtil.TruststoreKeystore(conf));
            return ZookeeperClient.configure().withConnectionString(connString).withNamespace(namespace).withZookeeperFactory(zkFactory).withAuthType(authType).withKeytab(keytab).withPrincipal(principal).withJaasLoginEntryName(JAAS_LOGIN_ENTRY_NAME).withRetryPolicy(retryPolicy).withSessionTimeout(sessionTimeout).withConnectionTimeout(connectionTimeout).enableSSL(isSSLEnabled).withKeystore(keystoreLocation).withKeystorePassword(keystorePassword).withTruststore(truststoreLocation).withTruststorePassword(truststorePassword).create();
        }
        catch (Exception ex) {
            throw new RuntimeException("Could not Load ZK acls or auth: " + ex, ex);
        }
    }

    @Override
    public void startThreads() throws IOException {
        if (!this.isExternalClient) {
            try {
                this.zkClient.start();
            }
            catch (Exception e) {
                throw new IOException("Could not start Curator Framework", e);
            }
        }
        CuratorFramework nullNsFw = this.zkClient.usingNamespace(null);
        try {
            String nameSpace = "/" + this.zkClient.getNamespace();
            nullNsFw.create().creatingParentContainersIfNeeded().forPath(nameSpace);
        }
        catch (KeeperException.NodeExistsException nameSpace) {
        }
        catch (Exception e) {
            throw new IOException("Could not create namespace", e);
        }
        try {
            this.delTokSeqCounter = new SharedCount(this.zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
            if (this.delTokSeqCounter != null) {
                this.delTokSeqCounter.start();
            }
            this.currentSeqNum = this.incrSharedCount(this.delTokSeqCounter, this.seqNumBatchSize);
            this.currentMaxSeqNum = this.currentSeqNum + this.seqNumBatchSize;
            LOG.info("Fetched initial range of seq num, from {} to {} ", (Object)(this.currentSeqNum + 1), (Object)this.currentMaxSeqNum);
        }
        catch (Exception e) {
            throw new IOException("Could not start Sequence Counter", e);
        }
        try {
            this.keyIdSeqCounter = new SharedCount(this.zkClient, ZK_DTSM_KEYID_ROOT, 0);
            if (this.keyIdSeqCounter != null) {
                this.keyIdSeqCounter.start();
            }
        }
        catch (Exception e) {
            throw new IOException("Could not start KeyId Counter", e);
        }
        try {
            this.createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
            this.createPersistentNode(ZK_DTSM_TOKENS_ROOT);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create ZK paths");
        }
        try {
            this.keyCache = CuratorCache.bridgeBuilder(this.zkClient, ZK_DTSM_MASTER_KEY_ROOT).build();
            CuratorCacheListener keyCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> {
                try {
                    this.processKeyAddOrUpdate(node.getData());
                }
                catch (IOException e) {
                    LOG.error("Error while processing Curator keyCacheListener NODE_CREATED / NODE_CHANGED event");
                    throw new UncheckedIOException(e);
                }
            }).forDeletes(childData -> this.processKeyRemoved(childData.getPath())).build();
            this.keyCache.listenable().addListener(keyCacheListener);
            this.keyCache.start();
            this.loadFromZKCache(false);
        }
        catch (Exception e) {
            throw new IOException("Could not start Curator keyCacheListener for keys", e);
        }
        if (this.isTokenWatcherEnabled) {
            LOG.info("TokenCache is enabled");
            try {
                this.tokenCache = CuratorCache.bridgeBuilder(this.zkClient, ZK_DTSM_TOKENS_ROOT).build();
                CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> {
                    try {
                        this.processTokenAddOrUpdate(node.getData());
                    }
                    catch (IOException e) {
                        LOG.error("Error while processing Curator tokenCacheListener NODE_CREATED / NODE_CHANGED event");
                        throw new UncheckedIOException(e);
                    }
                }).forDeletes(childData -> {
                    try {
                        this.processTokenRemoved((ChildData)childData);
                    }
                    catch (IOException e) {
                        LOG.error("Error while processing Curator tokenCacheListener NODE_DELETED event");
                        throw new UncheckedIOException(e);
                    }
                }).build();
                this.tokenCache.listenable().addListener(tokenCacheListener);
                this.tokenCache.start();
                this.loadFromZKCache(true);
            }
            catch (Exception e) {
                throw new IOException("Could not start Curator tokenCacheListener for tokens", e);
            }
        }
        super.startThreads();
    }

    private void loadFromZKCache(boolean isTokenCache) {
        String cacheName = isTokenCache ? "token" : "key";
        LOG.info("Starting to load {} cache.", (Object)cacheName);
        Stream<ChildData> children = isTokenCache ? this.tokenCache.stream() : this.keyCache.stream();
        AtomicInteger count = new AtomicInteger(0);
        children.forEach(childData -> {
            try {
                if (isTokenCache) {
                    this.processTokenAddOrUpdate(childData.getData());
                } else {
                    this.processKeyAddOrUpdate(childData.getData());
                }
            }
            catch (Exception e) {
                LOG.info("Ignoring node {} because it failed to load.", (Object)childData.getPath());
                LOG.debug("Failure exception:", e);
                count.getAndIncrement();
            }
        });
        if (isTokenCache) {
            this.syncTokenOwnerStats();
        }
        if (count.get() > 0) {
            LOG.warn("Ignored {} nodes while loading {} cache.", (Object)count.get(), (Object)cacheName);
        }
        LOG.info("Loaded {} cache.", (Object)cacheName);
    }

    private void processKeyAddOrUpdate(byte[] data) throws IOException {
        ByteArrayInputStream bin = new ByteArrayInputStream(data);
        DataInputStream din = new DataInputStream(bin);
        DelegationKey key = new DelegationKey();
        key.readFields(din);
        this.allKeys.put(key.getKeyId(), key);
    }

    private void processKeyRemoved(String path) {
        String tokSeg;
        int j;
        int i = path.lastIndexOf(47);
        if (i > 0 && (j = (tokSeg = path.substring(i + 1)).indexOf(95)) > 0) {
            int keyId = Integer.parseInt(tokSeg.substring(j + 1));
            this.allKeys.remove(keyId);
        }
    }

    protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
        ByteArrayInputStream bin = new ByteArrayInputStream(data);
        DataInputStream din = new DataInputStream(bin);
        AbstractDelegationTokenIdentifier ident = (AbstractDelegationTokenIdentifier)this.createIdentifier();
        ident.readFields(din);
        long renewDate = din.readLong();
        int pwdLen = din.readInt();
        byte[] password = new byte[pwdLen];
        int numRead = din.read(password, 0, pwdLen);
        if (numRead > -1) {
            AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = new AbstractDelegationTokenSecretManager.DelegationTokenInformation(renewDate, password);
            this.currentTokens.put(ident, tokenInfo);
            return (TokenIdent)ident;
        }
        return null;
    }

    private void processTokenRemoved(ChildData data) throws IOException {
        ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
        DataInputStream din = new DataInputStream(bin);
        AbstractDelegationTokenIdentifier ident = (AbstractDelegationTokenIdentifier)this.createIdentifier();
        ident.readFields(din);
        this.currentTokens.remove(ident);
    }

    @Override
    public void stopThreads() {
        super.stopThreads();
        try {
            if (this.tokenCache != null) {
                this.tokenCache.close();
            }
        }
        catch (Exception e) {
            LOG.error("Could not stop Delegation Token Cache", e);
        }
        try {
            if (this.delTokSeqCounter != null) {
                this.delTokSeqCounter.close();
            }
        }
        catch (Exception e) {
            LOG.error("Could not stop Delegation Token Counter", e);
        }
        try {
            if (this.keyIdSeqCounter != null) {
                this.keyIdSeqCounter.close();
            }
        }
        catch (Exception e) {
            LOG.error("Could not stop Key Id Counter", e);
        }
        try {
            if (this.keyCache != null) {
                this.keyCache.close();
            }
        }
        catch (Exception e) {
            LOG.error("Could not stop KeyCache", e);
        }
        try {
            if (!this.isExternalClient && this.zkClient != null) {
                this.zkClient.close();
            }
        }
        catch (Exception e) {
            LOG.error("Could not stop Curator Framework", e);
        }
    }

    private void createPersistentNode(String nodePath) throws Exception {
        try {
            ((ACLBackgroundPathAndBytesable)this.zkClient.create().withMode(CreateMode.PERSISTENT)).forPath(nodePath);
        }
        catch (KeeperException.NodeExistsException ne) {
            LOG.debug(nodePath + " znode already exists !!");
        }
        catch (Exception e) {
            throw new IOException(nodePath + " znode could not be created !!", e);
        }
    }

    @Override
    protected int getDelegationTokenSeqNum() {
        return this.delTokSeqCounter.getCount();
    }

    private int incrSharedCount(SharedCount sharedCount, int batchSize) throws Exception {
        VersionedValue<Integer> versionedValue;
        while (!sharedCount.trySetCount(versionedValue = sharedCount.getVersionedValue(), versionedValue.getValue() + batchSize)) {
        }
        return versionedValue.getValue();
    }

    @Override
    protected int incrementDelegationTokenSeqNum() {
        try {
            this.currentSeqNumLock.lock();
            if (this.currentSeqNum >= this.currentMaxSeqNum) {
                try {
                    this.currentSeqNum = this.incrSharedCount(this.delTokSeqCounter, this.seqNumBatchSize);
                    this.currentMaxSeqNum = this.currentSeqNum + this.seqNumBatchSize;
                    LOG.info("Fetched new range of seq num, from {} to {} ", (Object)(this.currentSeqNum + 1), (Object)this.currentMaxSeqNum);
                }
                catch (InterruptedException e) {
                    LOG.debug("Thread interrupted while performing token counter increment", e);
                    Thread.currentThread().interrupt();
                }
                catch (Exception e) {
                    throw new RuntimeException("Could not increment shared counter !!", e);
                }
            }
            int n = ++this.currentSeqNum;
            return n;
        }
        finally {
            this.currentSeqNumLock.unlock();
        }
    }

    @Override
    protected void setDelegationTokenSeqNum(int seqNum) {
        try {
            this.delTokSeqCounter.setCount(seqNum);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not set shared counter !!", e);
        }
    }

    @Override
    protected int getCurrentKeyId() {
        return this.keyIdSeqCounter.getCount();
    }

    @Override
    protected int incrementCurrentKeyId() {
        try {
            this.incrSharedCount(this.keyIdSeqCounter, 1);
        }
        catch (InterruptedException e) {
            LOG.debug("Thread interrupted while performing keyId increment", e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not increment shared keyId counter !!", e);
        }
        return this.keyIdSeqCounter.getCount();
    }

    @Override
    protected DelegationKey getDelegationKey(int keyId) {
        DelegationKey key = (DelegationKey)this.allKeys.get(keyId);
        if (key == null) {
            try {
                key = this.getKeyFromZK(keyId);
                if (key != null) {
                    this.allKeys.put(keyId, key);
                }
            }
            catch (IOException e) {
                LOG.error("Error retrieving key [" + keyId + "] from ZK", e);
            }
        }
        return key;
    }

    private DelegationKey getKeyFromZK(int keyId) throws IOException {
        String nodePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId);
        try {
            byte[] data = (byte[])this.zkClient.getData().forPath(nodePath);
            if (data == null || data.length == 0) {
                return null;
            }
            ByteArrayInputStream bin = new ByteArrayInputStream(data);
            DataInputStream din = new DataInputStream(bin);
            DelegationKey key = new DelegationKey();
            key.readFields(din);
            return key;
        }
        catch (KeeperException.NoNodeException e) {
            LOG.error("No node in path [" + nodePath + "]");
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        return null;
    }

    @Override
    protected AbstractDelegationTokenSecretManager.DelegationTokenInformation getTokenInfo(TokenIdent ident) {
        AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = (AbstractDelegationTokenSecretManager.DelegationTokenInformation)this.currentTokens.get(ident);
        if (tokenInfo == null) {
            try {
                tokenInfo = this.getTokenInfoFromZK(ident);
                if (tokenInfo != null) {
                    this.currentTokens.put(ident, tokenInfo);
                }
            }
            catch (IOException e) {
                LOG.error("Error retrieving tokenInfo [" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber() + "] from ZK", e);
            }
        }
        return tokenInfo;
    }

    protected void syncLocalCacheWithZk(TokenIdent ident) {
        try {
            AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = this.getTokenInfoFromZK(ident);
            if (tokenInfo != null && !this.currentTokens.containsKey(ident)) {
                this.currentTokens.put(ident, tokenInfo);
            } else if (tokenInfo == null && this.currentTokens.containsKey(ident)) {
                this.currentTokens.remove(ident);
            }
        }
        catch (IOException e) {
            LOG.error("Error retrieving tokenInfo [" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber() + "] from ZK", e);
        }
    }

    protected AbstractDelegationTokenSecretManager.DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) throws IOException {
        return this.getTokenInfoFromZK(ident, false);
    }

    protected AbstractDelegationTokenSecretManager.DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, boolean quiet) throws IOException {
        String nodePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
        return this.getTokenInfoFromZK(nodePath, quiet);
    }

    protected AbstractDelegationTokenSecretManager.DelegationTokenInformation getTokenInfoFromZK(String nodePath, boolean quiet) throws IOException {
        try {
            byte[] data = (byte[])this.zkClient.getData().forPath(nodePath);
            if (data == null || data.length == 0) {
                return null;
            }
            ByteArrayInputStream bin = new ByteArrayInputStream(data);
            DataInputStream din = new DataInputStream(bin);
            ((AbstractDelegationTokenIdentifier)this.createIdentifier()).readFields(din);
            long renewDate = din.readLong();
            int pwdLen = din.readInt();
            byte[] password = new byte[pwdLen];
            int numRead = din.read(password, 0, pwdLen);
            if (numRead > -1) {
                AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = new AbstractDelegationTokenSecretManager.DelegationTokenInformation(renewDate, password);
                return tokenInfo;
            }
        }
        catch (KeeperException.NoNodeException e) {
            if (!quiet) {
                LOG.error("No node in path [" + nodePath + "]");
            }
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        return null;
    }

    @Override
    protected void storeDelegationKey(DelegationKey key) throws IOException {
        this.addOrUpdateDelegationKey(key, false);
    }

    @Override
    protected void updateDelegationKey(DelegationKey key) throws IOException {
        this.addOrUpdateDelegationKey(key, true);
    }

    private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate) throws IOException {
        String nodeCreatePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + key.getKeyId());
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        DataOutputStream fsOut = new DataOutputStream(os);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId());
        }
        key.write(fsOut);
        try {
            if (this.zkClient.checkExists().forPath(nodeCreatePath) != null) {
                ((Stat)this.zkClient.setData().forPath(nodeCreatePath, os.toByteArray())).setVersion(-1);
                if (!isUpdate) {
                    LOG.debug("Key with path [" + nodeCreatePath + "] already exists.. Updating !!");
                }
            } else {
                ((ACLBackgroundPathAndBytesable)this.zkClient.create().withMode(CreateMode.PERSISTENT)).forPath(nodeCreatePath, os.toByteArray());
                if (isUpdate) {
                    LOG.debug("Updating non existent Key path [" + nodeCreatePath + "].. Adding new !!");
                }
            }
        }
        catch (KeeperException.NodeExistsException ne) {
            LOG.debug(nodeCreatePath + " znode already exists !!");
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
        finally {
            os.close();
        }
    }

    @Override
    protected void removeStoredMasterKey(DelegationKey key) {
        String nodeRemovePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + key.getKeyId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId());
        }
        try {
            if (this.zkClient.checkExists().forPath(nodeRemovePath) != null) {
                while (this.zkClient.checkExists().forPath(nodeRemovePath) != null) {
                    try {
                        ((ChildrenDeletable)this.zkClient.delete().guaranteed()).forPath(nodeRemovePath);
                    }
                    catch (KeeperException.NoNodeException nne) {
                        LOG.debug("Node already deleted by peer " + nodeRemovePath);
                    }
                }
            } else {
                LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
            }
        }
        catch (Exception e) {
            LOG.debug(nodeRemovePath + " znode could not be removed!!");
        }
    }

    @Override
    protected void storeToken(TokenIdent ident, AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo) throws IOException {
        try {
            this.addOrUpdateToken(ident, tokenInfo, false);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void updateToken(TokenIdent ident, AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo) throws IOException {
        String nodeRemovePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
        try {
            if (this.zkClient.checkExists().forPath(nodeRemovePath) != null) {
                this.addOrUpdateToken(ident, tokenInfo, true);
            } else {
                this.addOrUpdateToken(ident, tokenInfo, false);
                LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber(), e);
        }
    }

    @Override
    protected void removeStoredToken(TokenIdent ident) throws IOException {
        this.removeStoredToken(ident, false);
    }

    protected void removeStoredToken(TokenIdent ident, boolean checkAgainstZkBeforeDeletion) throws IOException {
        String nodeRemovePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
        try {
            AbstractDelegationTokenSecretManager.DelegationTokenInformation dtInfo = this.getTokenInfoFromZK(ident, true);
            if (dtInfo != null) {
                if (checkAgainstZkBeforeDeletion && dtInfo.getRenewDate() > Time.now()) {
                    LOG.info("Node already renewed by peer " + nodeRemovePath + " so this token should not be deleted");
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Removing ZKDTSMDelegationToken_" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
                }
                while (this.zkClient.checkExists().forPath(nodeRemovePath) != null) {
                    try {
                        ((ChildrenDeletable)this.zkClient.delete().guaranteed()).forPath(nodeRemovePath);
                    }
                    catch (KeeperException.NoNodeException nne) {
                        LOG.debug("Node already deleted by peer " + nodeRemovePath);
                    }
                }
            } else {
                LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Could not remove Stored Token ZKDTSMDelegationToken_" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber(), e);
        }
    }

    @Override
    public TokenIdent cancelToken(Token<TokenIdent> token, String canceller) throws IOException {
        ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
        DataInputStream in = new DataInputStream(buf);
        AbstractDelegationTokenIdentifier id = (AbstractDelegationTokenIdentifier)this.createIdentifier();
        id.readFields(in);
        this.syncLocalCacheWithZk(id);
        return super.cancelToken(token, canceller);
    }

    protected void addOrUpdateToken(TokenIdent ident, AbstractDelegationTokenSecretManager.DelegationTokenInformation info, boolean isUpdate) throws Exception {
        String nodeCreatePath = ZKDelegationTokenSecretManager.getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
        try (ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
             DataOutputStream tokenOut = new DataOutputStream(tokenOs);){
            ((AbstractDelegationTokenIdentifier)ident).write(tokenOut);
            tokenOut.writeLong(info.getRenewDate());
            tokenOut.writeInt(info.getPassword().length);
            tokenOut.write(info.getPassword());
            if (LOG.isDebugEnabled()) {
                LOG.debug((isUpdate ? "Updating " : "Storing ") + "ZKDTSMDelegationToken_" + ((AbstractDelegationTokenIdentifier)ident).getSequenceNumber());
            }
            if (isUpdate) {
                ((Stat)this.zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray())).setVersion(-1);
            } else {
                ((ACLBackgroundPathAndBytesable)this.zkClient.create().withMode(CreateMode.PERSISTENT)).forPath(nodeCreatePath, tokenOs.toByteArray());
            }
        }
    }

    public boolean isTokenWatcherEnabled() {
        return this.isTokenWatcherEnabled;
    }

    @InterfaceAudience.Private
    @InterfaceStability.Unstable
    @VisibleForTesting
    static String getNodePath(String root, String nodeName) {
        return root + "/" + nodeName;
    }

    @VisibleForTesting
    AbstractDelegationTokenSecretManager.DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
        return (AbstractDelegationTokenSecretManager.DelegationTokenInformation)this.currentTokens.get(ident);
    }
}

