package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import java.util.Collection;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.1-eep-912.jar:org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.class */
public final class ErasureCodingWorker {
    private static final Logger LOG = DataNode.LOG;
    private final DataNode datanode;
    private final Configuration conf;
    private final float xmitWeight;
    private ThreadPoolExecutor stripedReconstructionPool;
    private ThreadPoolExecutor stripedReadPool;

    public ErasureCodingWorker(Configuration configuration, DataNode dataNode) {
        this.datanode = dataNode;
        this.conf = configuration;
        this.xmitWeight = configuration.getFloat(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY, 0.5f);
        Preconditions.checkArgument(this.xmitWeight >= 0.0f, "Invalid value configured for dfs.datanode.ec.reconstruction.xmits.weight, it can not be negative value (" + this.xmitWeight + ").");
        initializeStripedReadThreadPool();
        initializeStripedBlkReconstructionThreadPool(configuration.getInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY, 8));
    }

    private void initializeStripedReadThreadPool() {
        LOG.debug("Using striped reads");
        this.stripedReadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { // from class: org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker.1
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override // org.apache.hadoop.util.Daemon.DaemonFactory, java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread newThread = super.newThread(runnable);
                newThread.setName("stripedRead-" + this.threadIndex.getAndIncrement());
                return newThread;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy() { // from class: org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker.2
            @Override // java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy, java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                ErasureCodingWorker.LOG.info("Execution for striped reading rejected, Executing in current thread");
                super.rejectedExecution(runnable, threadPoolExecutor);
            }
        });
        this.stripedReadPool.allowCoreThreadTimeOut(true);
    }

    private void initializeStripedBlkReconstructionThreadPool(int i) {
        LOG.debug("Using striped block reconstruction; pool threads={}", Integer.valueOf(i));
        this.stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(i, i, 60L, new LinkedBlockingQueue(), "StripedBlockReconstruction-", false);
        this.stripedReconstructionPool.allowCoreThreadTimeOut(true);
    }

    public void processErasureCodingTasks(Collection<BlockECReconstructionCommand.BlockECReconstructionInfo> collection) {
        for (BlockECReconstructionCommand.BlockECReconstructionInfo blockECReconstructionInfo : collection) {
            try {
                StripedBlockReconstructor stripedBlockReconstructor = new StripedBlockReconstructor(this, new StripedReconstructionInfo(blockECReconstructionInfo.getExtendedBlock(), blockECReconstructionInfo.getErasureCodingPolicy(), blockECReconstructionInfo.getLiveBlockIndices(), blockECReconstructionInfo.getSourceDnInfos(), blockECReconstructionInfo.getTargetDnInfos(), blockECReconstructionInfo.getTargetStorageTypes(), blockECReconstructionInfo.getTargetStorageIDs()));
                if (stripedBlockReconstructor.hasValidTargets()) {
                    this.stripedReconstructionPool.submit(stripedBlockReconstructor);
                    getDatanode().incrementXmitsInProcess(Math.max((int) (stripedBlockReconstructor.getXmits() * this.xmitWeight), 1));
                } else {
                    LOG.warn("No missing internal block. Skip reconstruction for task:{}", blockECReconstructionInfo);
                }
            } catch (Throwable th) {
                LOG.warn("Failed to reconstruct striped block {}", blockECReconstructionInfo.getExtendedBlock().getLocalBlock(), th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataNode getDatanode() {
        return this.datanode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Configuration getConf() {
        return this.conf;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionService<StripedBlockUtil.BlockReadStats> createReadService() {
        return new ExecutorCompletionService(this.stripedReadPool);
    }

    public void shutDown() {
        this.stripedReconstructionPool.shutdown();
        this.stripedReadPool.shutdown();
    }

    public float getXmitWeight() {
        return this.xmitWeight;
    }
}
