package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.Preconditions;
import org.apache.zookeeper.audit.AuditConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.401-eep-930.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.class */
public class FsDatasetAsyncDiskService {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) FsDatasetAsyncDiskService.class);
    private static final int CORE_THREADS_PER_VOLUME = 1;
    private final int maxNumThreadsPerVolume;
    private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
    private final DataNode datanode;
    private final FsDatasetImpl fsdatasetImpl;
    private static final int MAX_DELETED_BLOCKS = 64;
    private Map<String, ThreadPoolExecutor> executors = new HashMap();
    private Map<String, Set<Long>> deletedBlockIds = new HashMap();
    private int numDeletedBlocks = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.401-eep-930.jar:org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService$ReplicaFileDeleteTask.class */
    public class ReplicaFileDeleteTask implements Runnable {
        private final FsVolumeReference volumeRef;
        private final FsVolumeImpl volume;
        private final ReplicaInfo replicaToDelete;
        private final ExtendedBlock block;
        private final String trashDirectory;

        ReplicaFileDeleteTask(FsVolumeReference fsVolumeReference, ReplicaInfo replicaInfo, ExtendedBlock extendedBlock, String str) {
            this.volumeRef = fsVolumeReference;
            this.volume = (FsVolumeImpl) fsVolumeReference.getVolume();
            this.replicaToDelete = replicaInfo;
            this.block = extendedBlock;
            this.trashDirectory = str;
        }

        public String toString() {
            return "deletion of block " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " with block file " + this.replicaToDelete.getBlockURI() + " and meta file " + this.replicaToDelete.getMetadataURI() + " from volume " + this.volume;
        }

        private boolean deleteFiles() {
            return this.replicaToDelete.deleteBlockData() && (this.replicaToDelete.deleteMetadata() || !this.replicaToDelete.metadataExists());
        }

        private boolean moveFiles() {
            if (this.trashDirectory == null) {
                FsDatasetAsyncDiskService.LOG.error("Trash dir for replica " + this.replicaToDelete + " is null");
                return false;
            }
            try {
                this.volume.getFileIoProvider().mkdirsWithExistsCheck(this.volume, new File(this.trashDirectory));
                if (FsDatasetAsyncDiskService.LOG.isDebugEnabled()) {
                    FsDatasetAsyncDiskService.LOG.debug("Moving files " + this.replicaToDelete.getBlockURI() + " and " + this.replicaToDelete.getMetadataURI() + " to trash.");
                }
                String blockName = this.replicaToDelete.getBlockName();
                long generationStamp = this.replicaToDelete.getGenerationStamp();
                File file = new File(this.trashDirectory, blockName);
                File file2 = new File(this.trashDirectory, DatanodeUtil.getMetaName(blockName, generationStamp));
                try {
                    if (this.replicaToDelete.renameData(file.toURI())) {
                        if (this.replicaToDelete.renameMeta(file2.toURI())) {
                            return true;
                        }
                    }
                    return false;
                } catch (IOException e) {
                    FsDatasetAsyncDiskService.LOG.error("Error moving files to trash: " + this.replicaToDelete, (Throwable) e);
                    return false;
                }
            } catch (IOException e2) {
                return false;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long blockDataLength = this.replicaToDelete.getBlockDataLength();
                long metadataLength = this.replicaToDelete.getMetadataLength();
                if (this.trashDirectory == null ? deleteFiles() : moveFiles()) {
                    if (this.block.getLocalBlock().getNumBytes() != Long.MAX_VALUE) {
                        FsDatasetAsyncDiskService.this.datanode.notifyNamenodeDeletedBlock(this.block, this.volume.getStorageID());
                    }
                    this.volume.onBlockFileDeletion(this.block.getBlockPoolId(), blockDataLength);
                    this.volume.onMetaFileDeletion(this.block.getBlockPoolId(), metadataLength);
                    FsDatasetAsyncDiskService.LOG.info("Deleted " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " URI " + this.replicaToDelete.getBlockURI());
                } else {
                    FsDatasetAsyncDiskService.LOG.warn("Unexpected error trying to " + (this.trashDirectory == null ? AuditConstants.OP_DELETE : "move") + " block " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " at file " + this.replicaToDelete.getBlockURI() + ". Ignored.");
                }
                FsDatasetAsyncDiskService.this.updateDeletedBlockId(this.block);
                IOUtils.cleanupWithLogger(null, this.volumeRef);
            } catch (Throwable th) {
                IOUtils.cleanupWithLogger(null, this.volumeRef);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FsDatasetAsyncDiskService(DataNode dataNode, FsDatasetImpl fsDatasetImpl) {
        this.datanode = dataNode;
        this.fsdatasetImpl = fsDatasetImpl;
        this.maxNumThreadsPerVolume = dataNode.getConf().getInt(DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY, 4);
        Preconditions.checkArgument(this.maxNumThreadsPerVolume > 0, "dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume must be a positive integer.");
    }

    private void addExecutorForVolume(final FsVolumeImpl fsVolumeImpl) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, this.maxNumThreadsPerVolume, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService.1
            int counter = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                int i;
                synchronized (this) {
                    i = this.counter;
                    this.counter = i + 1;
                }
                Thread thread = new Thread(runnable);
                thread.setName("Async disk worker #" + i + " for volume " + fsVolumeImpl);
                return thread;
            }
        });
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.executors.put(fsVolumeImpl.getStorageID(), threadPoolExecutor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addVolume(FsVolumeImpl fsVolumeImpl) {
        if (this.executors == null) {
            throw new RuntimeException("AsyncDiskService is already shutdown");
        }
        if (fsVolumeImpl == null) {
            throw new RuntimeException("Attempt to add a null volume");
        }
        if (this.executors.get(fsVolumeImpl.getStorageID()) != null) {
            throw new RuntimeException("Volume " + fsVolumeImpl + " is already existed.");
        }
        addExecutorForVolume(fsVolumeImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void removeVolume(String str) {
        if (this.executors == null) {
            throw new RuntimeException("AsyncDiskService is already shutdown");
        }
        ThreadPoolExecutor threadPoolExecutor = this.executors.get(str);
        if (threadPoolExecutor == null) {
            throw new RuntimeException("Can not find volume with storageId " + str + " to remove.");
        }
        threadPoolExecutor.shutdown();
        this.executors.remove(str);
    }

    synchronized long countPendingDeletions() {
        long j = 0;
        for (ThreadPoolExecutor threadPoolExecutor : this.executors.values()) {
            j += threadPoolExecutor.getTaskCount() - threadPoolExecutor.getCompletedTaskCount();
        }
        return j;
    }

    synchronized void execute(FsVolumeImpl fsVolumeImpl, Runnable runnable) {
        try {
            if (this.executors == null) {
                throw new RuntimeException("AsyncDiskService is already shutdown");
            }
            if (fsVolumeImpl == null) {
                throw new RuntimeException("A null volume does not have a executor");
            }
            ThreadPoolExecutor threadPoolExecutor = this.executors.get(fsVolumeImpl.getStorageID());
            if (threadPoolExecutor == null) {
                throw new RuntimeException("Cannot find volume " + fsVolumeImpl + " for execution of task " + runnable);
            }
            threadPoolExecutor.execute(runnable);
        } catch (RuntimeException e) {
            if (runnable instanceof ReplicaFileDeleteTask) {
                IOUtils.cleanupWithLogger(null, ((ReplicaFileDeleteTask) runnable).volumeRef);
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void shutdown() {
        if (this.executors == null) {
            LOG.warn("AsyncDiskService has already shut down.");
            return;
        }
        LOG.info("Shutting down all async disk service threads");
        Iterator<Map.Entry<String, ThreadPoolExecutor>> it = this.executors.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutdown();
        }
        this.executors = null;
        LOG.info("All async disk service threads have been shut down");
    }

    public void submitSyncFileRangeRequest(FsVolumeImpl fsVolumeImpl, ReplicaOutputStreams replicaOutputStreams, long j, long j2, int i) {
        execute(fsVolumeImpl, () -> {
            try {
                replicaOutputStreams.syncFileRangeIfPossible(j, j2, i);
            } catch (NativeIOException e) {
                try {
                    LOG.warn("sync_file_range error. Volume: {}, Capacity: {}, Available space: {}, File range offset: {}, length: {}, flags: {}", fsVolumeImpl, Long.valueOf(fsVolumeImpl.getCapacity()), Long.valueOf(fsVolumeImpl.getAvailable()), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), e);
                } catch (IOException e2) {
                    LOG.warn("sync_file_range error. Volume: {}, Capacity: {}, File range offset: {}, length: {}, flags: {}", fsVolumeImpl, Long.valueOf(fsVolumeImpl.getCapacity()), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteAsync(FsVolumeReference fsVolumeReference, ReplicaInfo replicaInfo, ExtendedBlock extendedBlock, String str) {
        LOG.info("Scheduling " + extendedBlock.getLocalBlock() + " replica " + replicaInfo + " for deletion");
        execute((FsVolumeImpl) fsVolumeReference.getVolume(), new ReplicaFileDeleteTask(fsVolumeReference, replicaInfo, extendedBlock, str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteSync(FsVolumeReference fsVolumeReference, ReplicaInfo replicaInfo, ExtendedBlock extendedBlock, String str) {
        LOG.info("Deleting " + extendedBlock.getLocalBlock() + " replica " + replicaInfo);
        new ReplicaFileDeleteTask(fsVolumeReference, replicaInfo, extendedBlock, str).run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateDeletedBlockId(ExtendedBlock extendedBlock) {
        Set<Long> set = this.deletedBlockIds.get(extendedBlock.getBlockPoolId());
        if (set == null) {
            set = new HashSet();
            this.deletedBlockIds.put(extendedBlock.getBlockPoolId(), set);
        }
        set.add(Long.valueOf(extendedBlock.getBlockId()));
        this.numDeletedBlocks++;
        if (this.numDeletedBlocks == 64) {
            for (Map.Entry<String, Set<Long>> entry : this.deletedBlockIds.entrySet()) {
                String key = entry.getKey();
                Set<Long> value = entry.getValue();
                this.fsdatasetImpl.removeDeletedBlocks(key, value);
                value.clear();
            }
            this.numDeletedBlocks = 0;
        }
    }
}
