package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.202-eep-911.jar:org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.class */
public final class ReencryptionUpdater implements Runnable {
    public static final Logger LOG;
    private double throttleLimitRatio;
    private final FSDirectory dir;
    private final CompletionService<ReencryptionTask> batchService;
    private final ReencryptionHandler handler;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean shouldPauseForTesting = false;
    private volatile int pauseAfterNthCheckpoint = 0;
    private volatile long pauseZoneId = 0;
    private final StopWatch throttleTimerAll = new StopWatch();
    private final StopWatch throttleTimerLocked = new StopWatch();
    private volatile long faultRetryInterval = 60000;
    private volatile boolean isRunning = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.202-eep-911.jar:org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater$FileEdekInfo.class */
    public static final class FileEdekInfo {
        private final long inodeId;
        private final KeyProviderCryptoExtension.EncryptedKeyVersion existingEdek;
        private KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        public FileEdekInfo(FSDirectory fSDirectory, INodeFile iNodeFile) throws IOException {
            if (!$assertionsDisabled && !fSDirectory.hasReadLock()) {
                throw new AssertionError();
            }
            Preconditions.checkNotNull(iNodeFile, "INodeFile is null");
            this.inodeId = iNodeFile.getId();
            FileEncryptionInfo fileEncryptionInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(fSDirectory, INodesInPath.fromINode(iNodeFile));
            Preconditions.checkNotNull(fileEncryptionInfo, "FileEncryptionInfo is null for " + this.inodeId);
            this.existingEdek = KeyProviderCryptoExtension.EncryptedKeyVersion.createForDecryption(fileEncryptionInfo.getKeyName(), fileEncryptionInfo.getEzKeyVersionName(), fileEncryptionInfo.getIV(), fileEncryptionInfo.getEncryptedDataEncryptionKey());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public long getInodeId() {
            return this.inodeId;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public KeyProviderCryptoExtension.EncryptedKeyVersion getExistingEdek() {
            return this.existingEdek;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setEdek(KeyProviderCryptoExtension.EncryptedKeyVersion encryptedKeyVersion) {
            if (!$assertionsDisabled && encryptedKeyVersion == null) {
                throw new AssertionError();
            }
            this.edek = encryptedKeyVersion;
        }

        static {
            $assertionsDisabled = !ReencryptionUpdater.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.202-eep-911.jar:org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater$ReencryptionTask.class */
    public static final class ReencryptionTask {
        private final long zoneId;
        private int numFailures;
        private final ReencryptionHandler.ReencryptionBatch batch;
        private boolean processed = false;
        private int numFilesUpdated = 0;
        private String lastFile = null;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ReencryptionTask(long j, int i, ReencryptionHandler.ReencryptionBatch reencryptionBatch) {
            this.numFailures = 0;
            this.zoneId = j;
            this.numFailures = i;
            this.batch = reencryptionBatch;
        }

        static /* synthetic */ int access$604(ReencryptionTask reencryptionTask) {
            int i = reencryptionTask.numFilesUpdated + 1;
            reencryptionTask.numFilesUpdated = i;
            return i;
        }

        static /* synthetic */ int access$204(ReencryptionTask reencryptionTask) {
            int i = reencryptionTask.numFailures + 1;
            reencryptionTask.numFailures = i;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.4.202-eep-911.jar:org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater$ZoneSubmissionTracker.class */
    public static final class ZoneSubmissionTracker {
        private boolean submissionDone = false;
        private LinkedList<Future> tasks = new LinkedList<>();
        private int numCheckpointed = 0;
        private int numFutureDone = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        public void reset() {
            this.submissionDone = false;
            this.tasks.clear();
            this.numCheckpointed = 0;
            this.numFutureDone = 0;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public LinkedList<Future> getTasks() {
            return this.tasks;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void cancelAllTasks() {
            if (this.tasks.isEmpty()) {
                return;
            }
            ReencryptionUpdater.LOG.info("Cancelling {} re-encryption tasks", Integer.valueOf(this.tasks.size()));
            Iterator<Future> it = this.tasks.iterator();
            while (it.hasNext()) {
                it.next().cancel(true);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void addTask(Future future) {
            this.tasks.add(future);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isCompleted() {
            return this.submissionDone && this.tasks.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setSubmissionDone() {
            this.submissionDone = true;
        }

        static /* synthetic */ int access$904(ZoneSubmissionTracker zoneSubmissionTracker) {
            int i = zoneSubmissionTracker.numCheckpointed + 1;
            zoneSubmissionTracker.numCheckpointed = i;
            return i;
        }

        static /* synthetic */ int access$1108(ZoneSubmissionTracker zoneSubmissionTracker) {
            int i = zoneSubmissionTracker.numFutureDone;
            zoneSubmissionTracker.numFutureDone = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public synchronized void pauseForTesting() {
        this.shouldPauseForTesting = true;
        LOG.info("Pausing re-encrypt updater for testing.");
        notify();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public synchronized void resumeForTesting() {
        this.shouldPauseForTesting = false;
        LOG.info("Resuming re-encrypt updater for testing.");
        notify();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void pauseForTestingAfterNthCheckpoint(long j, int i) {
        if (!$assertionsDisabled && this.pauseAfterNthCheckpoint != 0) {
            throw new AssertionError();
        }
        this.pauseAfterNthCheckpoint = i;
        this.pauseZoneId = j;
    }

    @VisibleForTesting
    boolean isRunning() {
        return this.isRunning;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReencryptionUpdater(FSDirectory fSDirectory, CompletionService<ReencryptionTask> completionService, ReencryptionHandler reencryptionHandler, Configuration configuration) {
        this.dir = fSDirectory;
        this.batchService = completionService;
        this.handler = reencryptionHandler;
        this.throttleLimitRatio = configuration.getDouble(DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY, 1.0d);
        Preconditions.checkArgument(this.throttleLimitRatio > CMAESOptimizer.DEFAULT_STOPFITNESS, "dfs.namenode.reencrypt.throttle.limit.updater.ratio is not positive.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markZoneSubmissionDone(long j) throws IOException, InterruptedException {
        ZoneSubmissionTracker tracker = this.handler.getTracker(j);
        if (tracker == null || tracker.getTasks().isEmpty()) {
            this.handler.addDummyTracker(j, tracker);
        } else {
            tracker.submissionDone = true;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.isRunning = true;
        this.throttleTimerAll.start();
        while (true) {
            try {
                takeAndProcessTasks();
            } catch (IOException | CancellationException e) {
                LOG.warn("Re-encryption updater thread exception.", e);
            } catch (InterruptedException e2) {
                LOG.warn("Re-encryption updater thread interrupted. Exiting.");
                Thread.currentThread().interrupt();
                this.isRunning = false;
                return;
            } catch (Throwable th) {
                LOG.error("Re-encryption updater thread exiting.", th);
                this.isRunning = false;
                return;
            }
        }
    }

    private void processTaskEntries(String str, ReencryptionTask reencryptionTask) throws IOException, InterruptedException {
        if (!$assertionsDisabled && !this.dir.hasWriteLock()) {
            throw new AssertionError();
        }
        if (!reencryptionTask.batch.isEmpty() && reencryptionTask.numFailures == 0) {
            LOG.debug("Updating file xattrs for re-encrypting zone {}, starting at {}", str, reencryptionTask.batch.getFirstFilePath());
            int size = reencryptionTask.batch.size();
            Iterator<FileEdekInfo> it = reencryptionTask.batch.getBatch().iterator();
            while (it.hasNext()) {
                FileEdekInfo next = it.next();
                LOG.trace("Updating {} for re-encryption.", Long.valueOf(next.getInodeId()));
                INode inode = this.dir.getInode(next.getInodeId());
                if (inode == null) {
                    LOG.debug("INode {} doesn't exist, skipping re-encrypt.", Long.valueOf(next.getInodeId()));
                    it.remove();
                } else {
                    Preconditions.checkNotNull(next.edek);
                    FileEncryptionInfo fileEncryptionInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(this.dir, INodesInPath.fromINode(inode));
                    if (!fileEncryptionInfo.getKeyName().equals(next.edek.getEncryptionKeyName())) {
                        LOG.debug("Inode {} EZ key changed, skipping re-encryption.", Long.valueOf(next.getInodeId()));
                        it.remove();
                    } else if (fileEncryptionInfo.getEzKeyVersionName().equals(next.edek.getEncryptionKeyVersionName())) {
                        LOG.debug("Inode {} EZ key version unchanged, skipping re-encryption.", Long.valueOf(next.getInodeId()));
                        it.remove();
                    } else if (Arrays.equals(fileEncryptionInfo.getEncryptedDataEncryptionKey(), next.existingEdek.getEncryptedKeyVersion().getMaterial())) {
                        FileEncryptionInfo fileEncryptionInfo2 = new FileEncryptionInfo(fileEncryptionInfo.getCipherSuite(), fileEncryptionInfo.getCryptoProtocolVersion(), next.edek.getEncryptedKeyVersion().getMaterial(), next.edek.getEncryptedKeyIv(), fileEncryptionInfo.getKeyName(), next.edek.getEncryptionKeyVersionName());
                        INodesInPath fromINode = INodesInPath.fromINode(inode);
                        FSDirEncryptionZoneOp.setFileEncryptionInfo(this.dir, fromINode, fileEncryptionInfo2, XAttrSetFlag.REPLACE);
                        reencryptionTask.lastFile = fromINode.getPath();
                        ReencryptionTask.access$604(reencryptionTask);
                    } else {
                        LOG.debug("Inode {} existing edek changed, skipping re-encryption", Long.valueOf(next.getInodeId()));
                        it.remove();
                    }
                }
            }
            LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption, starting:{}.", Integer.valueOf(reencryptionTask.numFilesUpdated), Integer.valueOf(size), str, reencryptionTask.batch.getFirstFilePath());
        }
        reencryptionTask.processed = true;
    }

    private List<XAttr> processCheckpoints(INode iNode, ZoneSubmissionTracker zoneSubmissionTracker) throws ExecutionException, IOException, InterruptedException {
        if (!$assertionsDisabled && !this.dir.hasWriteLock()) {
            throw new AssertionError();
        }
        long id = iNode.getId();
        String fullPathName = iNode.getFullPathName();
        ZoneReencryptionStatus zoneStatus = this.handler.getReencryptionStatus().getZoneStatus(Long.valueOf(id));
        if (!$assertionsDisabled && zoneStatus == null) {
            throw new AssertionError();
        }
        LinkedList<Future> tasks = zoneSubmissionTracker.getTasks();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1);
        ListIterator<Future> listIterator = tasks.listIterator();
        synchronized (this.handler) {
            while (listIterator.hasNext()) {
                Future next = listIterator.next();
                if (next.isCancelled() || !next.isDone() || !((ReencryptionTask) next.get()).processed) {
                    break;
                }
                ReencryptionTask reencryptionTask = (ReencryptionTask) next.get();
                LOG.debug("Updating re-encryption checkpoint with completed task. last: {} size:{}.", reencryptionTask.lastFile, Integer.valueOf(reencryptionTask.batch.size()));
                if (!$assertionsDisabled && id != reencryptionTask.zoneId) {
                    throw new AssertionError();
                }
                try {
                    XAttr updateReencryptionProgress = FSDirEncryptionZoneOp.updateReencryptionProgress(this.dir, iNode, zoneStatus, reencryptionTask.lastFile, reencryptionTask.numFilesUpdated, reencryptionTask.numFailures);
                    newArrayListWithCapacity.clear();
                    newArrayListWithCapacity.add(updateReencryptionProgress);
                } catch (IOException e) {
                    LOG.warn("Failed to update re-encrypted progress to xattr for zone {}", fullPathName, e);
                    ReencryptionTask.access$204(reencryptionTask);
                }
                ZoneSubmissionTracker.access$904(zoneSubmissionTracker);
                listIterator.remove();
            }
        }
        if (!zoneSubmissionTracker.isCompleted()) {
            return newArrayListWithCapacity;
        }
        LOG.debug("Removed re-encryption tracker for zone {} because it completed with {} tasks.", fullPathName, Integer.valueOf(zoneSubmissionTracker.numCheckpointed));
        return this.handler.completeReencryption(iNode);
    }

    private void takeAndProcessTasks() throws Exception {
        boolean z;
        Future<ReencryptionTask> take = this.batchService.take();
        throttle();
        checkPauseForTesting();
        if (take.isCancelled()) {
            LOG.debug("Skipped a canceled re-encryption task");
            return;
        }
        ReencryptionTask reencryptionTask = take.get();
        do {
            this.dir.getFSNamesystem().writeLock();
            try {
                try {
                    this.throttleTimerLocked.start();
                    processTask(reencryptionTask);
                    z = false;
                    this.dir.getFSNamesystem().writeUnlock("reencryptUpdater");
                    this.throttleTimerLocked.stop();
                } catch (SafeModeException | RetriableException e) {
                    LOG.info("Exception when processing re-encryption task for zone {}, retrying...", Long.valueOf(reencryptionTask.zoneId), e);
                    z = true;
                    Thread.sleep(this.faultRetryInterval);
                    this.dir.getFSNamesystem().writeUnlock("reencryptUpdater");
                    this.throttleTimerLocked.stop();
                } catch (IOException e2) {
                    LOG.warn("Failure processing re-encryption task for zone {}", Long.valueOf(reencryptionTask.zoneId), e2);
                    ReencryptionTask.access$204(reencryptionTask);
                    reencryptionTask.processed = true;
                    z = false;
                    this.dir.getFSNamesystem().writeUnlock("reencryptUpdater");
                    this.throttleTimerLocked.stop();
                }
                this.dir.getEditLog().logSync();
            } catch (Throwable th) {
                this.dir.getFSNamesystem().writeUnlock("reencryptUpdater");
                this.throttleTimerLocked.stop();
                throw th;
            }
        } while (z);
    }

    private void processTask(ReencryptionTask reencryptionTask) throws InterruptedException, ExecutionException, IOException {
        this.dir.writeLock();
        try {
            this.handler.getTraverser().checkINodeReady(reencryptionTask.zoneId);
            INode inode = this.dir.getInode(reencryptionTask.zoneId);
            if (inode == null) {
                return;
            }
            String fullPathName = inode.getFullPathName();
            LOG.info("Processing returned re-encryption task for zone {}({}), batch size {}, start:{}", fullPathName, Long.valueOf(reencryptionTask.zoneId), Integer.valueOf(reencryptionTask.batch.size()), reencryptionTask.batch.getFirstFilePath());
            ZoneSubmissionTracker tracker = this.handler.getTracker(inode.getId());
            if (tracker == null) {
                LOG.info("Re-encryption was canceled.");
                this.dir.writeUnlock();
                return;
            }
            ZoneSubmissionTracker.access$1108(tracker);
            EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
            processTaskEntries(fullPathName, reencryptionTask);
            EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
            List<XAttr> processCheckpoints = processCheckpoints(inode, tracker);
            this.dir.writeUnlock();
            FSDirEncryptionZoneOp.saveFileXAttrsForBatch(this.dir, reencryptionTask.batch.getBatch());
            if (processCheckpoints.isEmpty()) {
                return;
            }
            this.dir.getEditLog().logSetXAttrs(fullPathName, processCheckpoints, false);
        } finally {
            this.dir.writeUnlock();
        }
    }

    private synchronized void checkPauseForTesting() throws InterruptedException {
        ZoneSubmissionTracker unprotectedGetTracker;
        if (!$assertionsDisabled && this.dir.hasWriteLock()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.dir.getFSNamesystem().hasWriteLock()) {
            throw new AssertionError();
        }
        if (this.pauseAfterNthCheckpoint != 0 && (unprotectedGetTracker = this.handler.unprotectedGetTracker(this.pauseZoneId)) != null && unprotectedGetTracker.numFutureDone == this.pauseAfterNthCheckpoint) {
            this.shouldPauseForTesting = true;
            this.pauseAfterNthCheckpoint = 0;
        }
        while (this.shouldPauseForTesting) {
            LOG.info("Sleeping in the re-encryption updater for unit test.");
            wait();
            LOG.info("Continuing re-encryption updater after pausing.");
        }
    }

    private void throttle() throws InterruptedException {
        if (this.throttleLimitRatio >= 1.0d) {
            return;
        }
        long now = (long) (this.throttleTimerAll.now(TimeUnit.MILLISECONDS) * this.throttleLimitRatio);
        long now2 = this.throttleTimerLocked.now(TimeUnit.MILLISECONDS);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Re-encryption updater throttling expect: {}, actual: {}, throttleTimerAll:{}", Long.valueOf(now), Long.valueOf(now2), Long.valueOf(this.throttleTimerAll.now(TimeUnit.MILLISECONDS)));
        }
        if (now - now2 < 0) {
            long now3 = ((long) (now2 / this.throttleLimitRatio)) - this.throttleTimerAll.now(TimeUnit.MILLISECONDS);
            LOG.debug("Throttling re-encryption, sleeping for {} ms", Long.valueOf(now3));
            Thread.sleep(now3);
        }
        this.throttleTimerAll.reset().start();
        this.throttleTimerLocked.reset();
    }

    static {
        $assertionsDisabled = !ReencryptionUpdater.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) ReencryptionUpdater.class);
    }
}
