package org.apache.hadoop.hdfs.server.sps;

import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.server.balancer.KeyManager;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.101-eep-920.jar:org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.class */
public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExternalSPSBlockMoveTaskHandler.class);
    private final ExecutorService moveExecutor;
    private final CompletionService<BlockMovementAttemptFinished> mCompletionServ;
    private final NameNodeConnector nnc;
    private final SaslDataTransferClient saslClient;
    private final BlockStorageMovementTracker blkMovementTracker;
    private Daemon movementTrackerThread;
    private final SPSService service;
    private final BlockDispatcher blkDispatcher;

    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.101-eep-920.jar:org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler$BlockMovingTask.class */
    private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> {
        private final BlockStorageMovementCommand.BlockMovingInfo blkMovingInfo;

        BlockMovingTask(BlockStorageMovementCommand.BlockMovingInfo blockMovingInfo) {
            this.blkMovingInfo = blockMovingInfo;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public BlockMovementAttemptFinished call() {
            return new BlockMovementAttemptFinished(this.blkMovingInfo.getBlock(), this.blkMovingInfo.getSource(), this.blkMovingInfo.getTarget(), this.blkMovingInfo.getTargetStorageType(), moveBlock());
        }

        private BlockMovementStatus moveBlock() {
            ExtendedBlock extendedBlock = new ExtendedBlock(ExternalSPSBlockMoveTaskHandler.this.nnc.getBlockpoolID(), this.blkMovingInfo.getBlock());
            KeyManager keyManager = ExternalSPSBlockMoveTaskHandler.this.nnc.getKeyManager();
            try {
                return ExternalSPSBlockMoveTaskHandler.this.blkDispatcher.moveBlock(this.blkMovingInfo, ExternalSPSBlockMoveTaskHandler.this.saslClient, extendedBlock, new Socket(), keyManager, keyManager.getAccessToken(extendedBlock, new StorageType[]{this.blkMovingInfo.getTargetStorageType()}, new String[0]));
            } catch (IOException e) {
                ExternalSPSBlockMoveTaskHandler.LOG.warn("Failed to move block:{} from src:{} to destin:{} to satisfy storageType:{}", this.blkMovingInfo.getBlock(), this.blkMovingInfo.getSource(), this.blkMovingInfo.getTarget(), this.blkMovingInfo.getTargetStorageType(), e);
                return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.101-eep-920.jar:org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler$ExternalBlocksMovementsStatusHandler.class */
    private class ExternalBlocksMovementsStatusHandler implements BlocksMovementsStatusHandler {
        private ExternalBlocksMovementsStatusHandler() {
        }

        @Override // org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler
        public void handle(BlockMovementAttemptFinished blockMovementAttemptFinished) {
            ExternalSPSBlockMoveTaskHandler.this.service.notifyStorageMovementAttemptFinishedBlk(blockMovementAttemptFinished.getTargetDatanode(), blockMovementAttemptFinished.getTargetType(), blockMovementAttemptFinished.getBlock());
        }
    }

    public ExternalSPSBlockMoveTaskHandler(Configuration configuration, NameNodeConnector nameNodeConnector, SPSService sPSService) {
        this.moveExecutor = initializeBlockMoverThreadPool(configuration.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1000));
        this.mCompletionServ = new ExecutorCompletionService(this.moveExecutor);
        this.nnc = nameNodeConnector;
        this.saslClient = new SaslDataTransferClient(configuration, DataTransferSaslUtil.getSaslPropertiesResolver(configuration), TrustedChannelResolver.getInstance(configuration), nameNodeConnector.getFallbackToSimpleAuth());
        this.blkMovementTracker = new BlockStorageMovementTracker(this.mCompletionServ, new ExternalBlocksMovementsStatusHandler());
        this.service = sPSService;
        this.blkDispatcher = new BlockDispatcher(60000, DFSUtilClient.getIoFileBufferSize(configuration), configuration.getBoolean("dfs.client.use.datanode.hostname", false));
        startMovementTracker();
    }

    private void startMovementTracker() {
        this.movementTrackerThread = new Daemon(this.blkMovementTracker);
        this.movementTrackerThread.setName("BlockStorageMovementTracker");
        this.movementTrackerThread.start();
    }

    private ThreadPoolExecutor initializeBlockMoverThreadPool(int i) {
        LOG.debug("Block mover to satisfy storage policy; pool threads={}", Integer.valueOf(i));
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, i, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { // from class: org.apache.hadoop.hdfs.server.sps.ExternalSPSBlockMoveTaskHandler.1
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override // org.apache.hadoop.util.Daemon.DaemonFactory, java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread newThread = super.newThread(runnable);
                newThread.setName("BlockMoverTask-" + this.threadIndex.getAndIncrement());
                return newThread;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy() { // from class: org.apache.hadoop.hdfs.server.sps.ExternalSPSBlockMoveTaskHandler.2
            @Override // java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy, java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor2) {
                ExternalSPSBlockMoveTaskHandler.LOG.info("Execution for block movement to satisfy storage policy got rejected, Executing in current thread");
                super.rejectedExecution(runnable, threadPoolExecutor2);
            }
        });
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler
    public void submitMoveTask(BlockStorageMovementCommand.BlockMovingInfo blockMovingInfo) throws IOException {
        LOG.debug("Received BlockMovingTask {}", blockMovingInfo);
        this.mCompletionServ.submit(new BlockMovingTask(blockMovingInfo));
    }

    void cleanUp() {
        this.blkMovementTracker.stopTracking();
        if (this.movementTrackerThread != null) {
            this.movementTrackerThread.interrupt();
        }
    }
}
