package org.apache.hadoop.hdfs.server.common.sps;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-3.3.5.203-eep-921.jar:org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.class */
public class BlockDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlockDispatcher.class);
    private final boolean connectToDnViaHostname;
    private final int socketTimeout;
    private final int ioFileBufferSize;

    public BlockDispatcher(int i, int i2, boolean z) {
        this.socketTimeout = i;
        this.ioFileBufferSize = i2;
        this.connectToDnViaHostname = z;
    }

    public BlockMovementStatus moveBlock(BlockStorageMovementCommand.BlockMovingInfo blockMovingInfo, SaslDataTransferClient saslDataTransferClient, ExtendedBlock extendedBlock, Socket socket, DataEncryptionKeyFactory dataEncryptionKeyFactory, Token<BlockTokenIdentifier> token) {
        LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy storageType, sourceStoragetype:{} and destinStoragetype:{}", blockMovingInfo.getBlock(), blockMovingInfo.getSource(), blockMovingInfo.getTarget(), blockMovingInfo.getSourceStorageType(), blockMovingInfo.getTargetStorageType());
        DataOutputStream dataOutputStream = null;
        DataInputStream dataInputStream = null;
        try {
            try {
                try {
                    NetUtils.connect(socket, NetUtils.createSocketAddr(blockMovingInfo.getTarget().getXferAddr(this.connectToDnViaHostname)), this.socketTimeout);
                    socket.setSoTimeout(this.socketTimeout * 5);
                    socket.setKeepAlive(true);
                    OutputStream outputStream = socket.getOutputStream();
                    InputStream inputStream = socket.getInputStream();
                    LOG.debug("Connecting to datanode {}", blockMovingInfo.getTarget());
                    IOStreamPair socketSend = saslDataTransferClient.socketSend(socket, outputStream, inputStream, dataEncryptionKeyFactory, token, blockMovingInfo.getTarget());
                    OutputStream outputStream2 = socketSend.out;
                    InputStream inputStream2 = socketSend.in;
                    dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream2, this.ioFileBufferSize));
                    dataInputStream = new DataInputStream(new BufferedInputStream(inputStream2, this.ioFileBufferSize));
                    sendRequest(dataOutputStream, extendedBlock, token, blockMovingInfo.getSource(), blockMovingInfo.getTargetStorageType());
                    receiveResponse(dataInputStream);
                    LOG.info("Successfully moved block:{} from src:{} to destin:{} for satisfying storageType:{}", blockMovingInfo.getBlock(), blockMovingInfo.getSource(), blockMovingInfo.getTarget(), blockMovingInfo.getTargetStorageType());
                    BlockMovementStatus blockMovementStatus = BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
                    IOUtils.closeStream(dataOutputStream);
                    IOUtils.closeStream(dataInputStream);
                    IOUtils.closeSocket(socket);
                    return blockMovementStatus;
                } catch (BlockPinningException e) {
                    LOG.debug("Pinned block can't be moved, so skipping block:{}", blockMovingInfo.getBlock(), e);
                    BlockMovementStatus blockMovementStatus2 = BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
                    IOUtils.closeStream(dataOutputStream);
                    IOUtils.closeStream(dataInputStream);
                    IOUtils.closeSocket(socket);
                    return blockMovementStatus2;
                }
            } catch (IOException e2) {
                LOG.warn("Failed to move block:{} from src:{} to destin:{} to satisfy storageType:{}", blockMovingInfo.getBlock(), blockMovingInfo.getSource(), blockMovingInfo.getTarget(), blockMovingInfo.getTargetStorageType(), e2);
                BlockMovementStatus blockMovementStatus3 = BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(dataInputStream);
                IOUtils.closeSocket(socket);
                return blockMovementStatus3;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(dataInputStream);
            IOUtils.closeSocket(socket);
            throw th;
        }
    }

    private static void sendRequest(DataOutputStream dataOutputStream, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, DatanodeInfo datanodeInfo, StorageType storageType) throws IOException {
        new Sender(dataOutputStream).replaceBlock(extendedBlock, storageType, token, datanodeInfo.getDatanodeUuid(), datanodeInfo, null);
    }

    private static void receiveResponse(DataInputStream dataInputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(dataInputStream));
        while (true) {
            DataTransferProtos.BlockOpResponseProto blockOpResponseProto = parseFrom;
            if (blockOpResponseProto.getStatus() != DataTransferProtos.Status.IN_PROGRESS) {
                DataTransferProtoUtil.checkBlockOpStatus(blockOpResponseProto, "reportedBlock move is failed");
                return;
            }
            parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(dataInputStream));
        }
    }
}
