package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/DataXceiver.class */
public class DataXceiver extends Receiver implements Runnable {
    public static final Logger LOG;
    static final Log ClientTraceLog;
    private Peer peer;
    private final String remoteAddress;
    private final String remoteAddressWithoutPort;
    private final String localAddress;
    private final DataNode datanode;
    private final DNConf dnConf;
    private final DataXceiverServer dataXceiverServer;
    private final boolean connectToDnViaHostname;
    private long opStartTime;
    private final InputStream socketIn;
    private OutputStream socketOut;
    private BlockReceiver blockReceiver;
    private final int ioFileBufferSize;
    private final int smallBufferSize;
    private Thread xceiver;
    private String previousOpClientName;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static DataXceiver create(Peer peer, DataNode dataNode, DataXceiverServer dataXceiverServer) throws IOException {
        return new DataXceiver(peer, dataNode, dataXceiverServer);
    }

    private DataXceiver(Peer peer, DataNode dataNode, DataXceiverServer dataXceiverServer) throws IOException {
        super(FsTracer.get((Configuration) null));
        this.blockReceiver = null;
        this.xceiver = null;
        this.peer = peer;
        this.dnConf = dataNode.getDnConf();
        this.socketIn = peer.getInputStream();
        this.socketOut = peer.getOutputStream();
        this.datanode = dataNode;
        this.dataXceiverServer = dataXceiverServer;
        this.connectToDnViaHostname = dataNode.getDnConf().connectToDnViaHostname;
        this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(dataNode.getConf());
        this.smallBufferSize = DFSUtilClient.getSmallBufferSize(dataNode.getConf());
        this.remoteAddress = peer.getRemoteAddressString();
        int indexOf = this.remoteAddress.indexOf(58);
        this.remoteAddressWithoutPort = indexOf < 0 ? this.remoteAddress : this.remoteAddress.substring(0, indexOf);
        this.localAddress = peer.getLocalAddressString();
        LOG.debug("Number of active connections is: {}", Integer.valueOf(dataNode.getXceiverCount()));
    }

    private void updateCurrentThreadName(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ");
        if (this.previousOpClientName != null) {
            sb.append(this.previousOpClientName).append(" at ");
        }
        sb.append(this.remoteAddress);
        if (str != null) {
            sb.append(" [").append(str).append("]");
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    private OutputStream getOutputStream() {
        return this.socketOut;
    }

    public void sendOOB() throws IOException, InterruptedException {
        BlockReceiver currentBlockReceiver = getCurrentBlockReceiver();
        if (currentBlockReceiver == null) {
            return;
        }
        LOG.info("Sending OOB to peer: {}", this.peer);
        currentBlockReceiver.sendOOB();
    }

    public void stopWriter() {
        synchronized (this) {
            if (getCurrentBlockReceiver() == null) {
                return;
            }
            this.xceiver.interrupt();
            LOG.info("Stopped the writer: {}", this.peer);
        }
    }

    private synchronized void setCurrentBlockReceiver(BlockReceiver blockReceiver) {
        this.blockReceiver = blockReceiver;
    }

    private synchronized BlockReceiver getCurrentBlockReceiver() {
        return this.blockReceiver;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        Op op = null;
        try {
            try {
                synchronized (this) {
                    this.xceiver = Thread.currentThread();
                }
                this.dataXceiverServer.addPeer(this.peer, Thread.currentThread(), this);
                this.peer.setWriteTimeout(this.datanode.getDnConf().socketWriteTimeout);
                InputStream inputStream = this.socketIn;
                try {
                    IOStreamPair receive = this.datanode.saslServer.receive(this.peer, this.socketOut, this.socketIn, this.datanode.getXferAddress().getPort(), this.datanode.getDatanodeId());
                    BufferedInputStream bufferedInputStream = new BufferedInputStream(receive.in, this.smallBufferSize);
                    this.socketOut = receive.out;
                    super.initialize(new DataInputStream(bufferedInputStream));
                    do {
                        updateCurrentThreadName("Waiting for operation #" + (i + 1));
                        if (i != 0) {
                            try {
                                if (!$assertionsDisabled && this.dnConf.socketKeepaliveTimeout <= 0) {
                                    throw new AssertionError();
                                }
                                this.peer.setReadTimeout(this.dnConf.socketKeepaliveTimeout);
                            } catch (EOFException | ClosedChannelException e) {
                                LOG.debug("Cached {} closing after {} ops.  This message is usually benign.", this.peer, Integer.valueOf(i));
                            } catch (InterruptedIOException e2) {
                            } catch (IOException e3) {
                                incrDatanodeNetworkErrors();
                                throw e3;
                            }
                        } else {
                            this.peer.setReadTimeout(this.dnConf.socketTimeout);
                        }
                        Op readOp = readOp();
                        if (i != 0) {
                            this.peer.setReadTimeout(this.dnConf.socketTimeout);
                        }
                        this.opStartTime = Time.monotonicNow();
                        processOp(readOp);
                        i++;
                        if (this.peer == null || this.peer.isClosed()) {
                            break;
                        }
                    } while (this.dnConf.socketKeepaliveTimeout > 0);
                    collectThreadLocalStates();
                    LOG.debug("{}:Number of active connections is: {}", this.datanode.getDisplayName(), Integer.valueOf(this.datanode.getXceiverCount()));
                    updateCurrentThreadName("Cleaning up");
                    if (this.peer != null) {
                        this.dataXceiverServer.closePeer(this.peer);
                        IOUtils.closeStream(this.in);
                    }
                } catch (InvalidMagicNumberException e4) {
                    if (e4.isHandshake4Encryption()) {
                        LOG.info("Failed to read expected encryption handshake from client at {}. Perhaps the client is running an older version of Hadoop which does not support encryption", this.peer.getRemoteAddressString(), e4);
                    } else {
                        LOG.info("Failed to read expected SASL data transfer protection handshake from client at {}. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection", this.peer.getRemoteAddressString(), e4);
                    }
                    collectThreadLocalStates();
                    LOG.debug("{}:Number of active connections is: {}", this.datanode.getDisplayName(), Integer.valueOf(this.datanode.getXceiverCount()));
                    updateCurrentThreadName("Cleaning up");
                    if (this.peer != null) {
                        this.dataXceiverServer.closePeer(this.peer);
                        IOUtils.closeStream(this.in);
                    }
                }
            } catch (Throwable th) {
                String str = this.datanode.getDisplayName() + ":DataXceiver error processing " + (0 == 0 ? "unknown" : op.name()) + " operation  src: " + this.remoteAddress + " dst: " + this.localAddress;
                if (null == Op.WRITE_BLOCK && (th instanceof ReplicaAlreadyExistsException)) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(str, th);
                    } else {
                        LOG.info("{}; {}", str, th.toString());
                    }
                } else if (null == Op.READ_BLOCK && (th instanceof SocketTimeoutException)) {
                    String str2 = "Likely the client has stopped reading, disconnecting it (" + str + ")";
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(str2, th);
                    } else {
                        LOG.info("{}; {}", str2, th.toString());
                    }
                } else if ((th instanceof SecretManager.InvalidToken) || (th.getCause() instanceof SecretManager.InvalidToken)) {
                    LOG.trace(str, th);
                } else {
                    LOG.error(str, th);
                }
                collectThreadLocalStates();
                LOG.debug("{}:Number of active connections is: {}", this.datanode.getDisplayName(), Integer.valueOf(this.datanode.getXceiverCount()));
                updateCurrentThreadName("Cleaning up");
                if (this.peer != null) {
                    this.dataXceiverServer.closePeer(this.peer);
                    IOUtils.closeStream(this.in);
                }
            }
        } catch (Throwable th2) {
            collectThreadLocalStates();
            LOG.debug("{}:Number of active connections is: {}", this.datanode.getDisplayName(), Integer.valueOf(this.datanode.getXceiverCount()));
            updateCurrentThreadName("Cleaning up");
            if (this.peer != null) {
                this.dataXceiverServer.closePeer(this.peer);
                IOUtils.closeStream(this.in);
            }
            throw th2;
        }
    }

    private void collectThreadLocalStates() {
        if (!this.datanode.getDnConf().peerStatsEnabled || this.datanode.getPeerMetrics() == null) {
            return;
        }
        this.datanode.getPeerMetrics().collectThreadLocalStates();
    }

    public void requestShortCircuitFds(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, ShortCircuitShm.SlotId slotId, int i, boolean z) throws IOException {
        updateCurrentThreadName("Passing file descriptors for block " + extendedBlock);
        checkAccess(getBufferedOutputStream(), true, extendedBlock, token, Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ, null, null);
        DataTransferProtos.BlockOpResponseProto.Builder newBuilder = DataTransferProtos.BlockOpResponseProto.newBuilder();
        FileInputStream[] fileInputStreamArr = null;
        ShortCircuitShm.SlotId slotId2 = null;
        boolean z2 = false;
        try {
            try {
                try {
                } catch (DataNode.ShortCircuitFdsVersionException e) {
                    newBuilder.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                    newBuilder.setShortCircuitAccessVersion(1);
                    newBuilder.setMessage(e.getMessage());
                }
            } catch (DataNode.ShortCircuitFdsUnsupportedException e2) {
                newBuilder.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                newBuilder.setMessage(e2.getMessage());
            } catch (IOException e3) {
                newBuilder.setStatus(DataTransferProtos.Status.ERROR);
                newBuilder.setMessage(e3.getMessage());
                LOG.error("Request short-circuit read file descriptor failed with unknown error.", e3);
            }
            if (this.peer.getDomainSocket() == null) {
                throw new IOException("You cannot pass file descriptors over anything but a UNIX domain socket.");
            }
            if (slotId != null) {
                this.datanode.shortCircuitRegistry.registerSlot(ExtendedBlockId.fromExtendedBlock(extendedBlock), slotId, this.datanode.data.isCached(extendedBlock.getBlockPoolId(), extendedBlock.getBlockId()));
                slotId2 = slotId;
            }
            fileInputStreamArr = this.datanode.requestShortCircuitFdsForRead(extendedBlock, token, i);
            Preconditions.checkState(fileInputStreamArr != null);
            newBuilder.setStatus(DataTransferProtos.Status.SUCCESS);
            newBuilder.setShortCircuitAccessVersion(1);
            newBuilder.build().writeDelimitedTo(this.socketOut);
            if (fileInputStreamArr != null) {
                FileDescriptor[] fileDescriptorArr = new FileDescriptor[fileInputStreamArr.length];
                for (int i2 = 0; i2 < fileDescriptorArr.length; i2++) {
                    fileDescriptorArr[i2] = fileInputStreamArr[i2].getFD();
                }
                byte[] bArr = new byte[1];
                if (z) {
                    bArr[0] = (byte) DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION.getNumber();
                } else {
                    bArr[0] = (byte) DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
                }
                DomainSocket domainSocket = this.peer.getDomainSocket();
                domainSocket.sendFileDescriptors(fileDescriptorArr, bArr, 0, bArr.length);
                if (z) {
                    LOG.trace("Reading receipt verification byte for {}", slotId);
                    if (domainSocket.getInputStream().read() < 0) {
                        throw new EOFException();
                    }
                } else {
                    LOG.trace("Receipt verification is not enabled on the DataNode. Not verifying {}", slotId);
                }
                z2 = true;
            }
            if (!z2 && slotId2 != null) {
                LOG.info("Unregistering {} because the requestShortCircuitFdsForRead operation failed.", slotId2);
                this.datanode.shortCircuitRegistry.unregisterSlot(slotId2);
            }
            if (ClientTraceLog.isInfoEnabled()) {
                BlockSender.ClientTraceLog.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", Long.valueOf(extendedBlock.getBlockId()), this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId()).getDatanodeUuid(), Boolean.valueOf(z2)));
            }
            if (fileInputStreamArr != null) {
                IOUtils.cleanupWithLogger((Logger) null, fileInputStreamArr);
            }
        } catch (Throwable th) {
            if (0 == 0 && 0 != 0) {
                LOG.info("Unregistering {} because the requestShortCircuitFdsForRead operation failed.", (Object) null);
                this.datanode.shortCircuitRegistry.unregisterSlot(null);
            }
            if (ClientTraceLog.isInfoEnabled()) {
                BlockSender.ClientTraceLog.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", Long.valueOf(extendedBlock.getBlockId()), this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId()).getDatanodeUuid(), false));
            }
            if (0 != 0) {
                IOUtils.cleanupWithLogger((Logger) null, (Closeable[]) null);
            }
            throw th;
        }
    }

    public void releaseShortCircuitFds(ShortCircuitShm.SlotId slotId) throws IOException {
        String str;
        DataTransferProtos.Status status;
        boolean z = false;
        try {
            try {
                try {
                    this.datanode.shortCircuitRegistry.unregisterSlot(slotId);
                    str = null;
                    status = DataTransferProtos.Status.SUCCESS;
                } catch (Throwable th) {
                    str = th.getMessage();
                    status = DataTransferProtos.Status.ERROR_INVALID;
                }
            } catch (UnsupportedOperationException e) {
                str = "unsupported operation";
                status = DataTransferProtos.Status.ERROR_UNSUPPORTED;
            }
            DataTransferProtos.ReleaseShortCircuitAccessResponseProto.Builder newBuilder = DataTransferProtos.ReleaseShortCircuitAccessResponseProto.newBuilder();
            newBuilder.setStatus(status);
            if (str != null) {
                newBuilder.setError(str);
            }
            newBuilder.build().writeDelimitedTo(this.socketOut);
            z = true;
            if (ClientTraceLog.isInfoEnabled()) {
                BlockSender.ClientTraceLog.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", Long.valueOf(slotId.getShmId().getHi()), Long.valueOf(slotId.getShmId().getLo()), Integer.valueOf(slotId.getSlotIdx()), this.datanode.getDatanodeUuid(), true));
            }
        } catch (Throwable th2) {
            if (ClientTraceLog.isInfoEnabled()) {
                BlockSender.ClientTraceLog.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", Long.valueOf(slotId.getShmId().getHi()), Long.valueOf(slotId.getShmId().getLo()), Integer.valueOf(slotId.getSlotIdx()), this.datanode.getDatanodeUuid(), Boolean.valueOf(z)));
            }
            throw th2;
        }
    }

    private void sendShmErrorResponse(DataTransferProtos.Status status, String str) throws IOException {
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(status).setError(str).build().writeDelimitedTo(this.socketOut);
    }

    private void sendShmSuccessResponse(DomainSocket domainSocket, ShortCircuitRegistry.NewShmInfo newShmInfo) throws IOException {
        DataNodeFaultInjector.get().sendShortCircuitShmResponse();
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setId(PBHelperClient.convert(newShmInfo.getShmId())).build().writeDelimitedTo(this.socketOut);
        byte[] bArr = {0};
        domainSocket.sendFileDescriptors(new FileDescriptor[]{newShmInfo.getFileStream().getFD()}, bArr, 0, bArr.length);
    }

    public void requestShortCircuitShm(String str) throws IOException {
        ShortCircuitRegistry.NewShmInfo newShmInfo = null;
        DomainSocket domainSocket = this.peer.getDomainSocket();
        try {
            if (domainSocket == null) {
                sendShmErrorResponse(DataTransferProtos.Status.ERROR_INVALID, "Bad request from " + this.peer + ": must request a shared memory segment over a UNIX domain socket.");
                if (ClientTraceLog.isInfoEnabled()) {
                    if (0 != 0) {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", str, Long.valueOf(newShmInfo.getShmId().getHi()), Long.valueOf(newShmInfo.getShmId().getLo()), this.datanode.getDatanodeUuid()));
                    } else {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", str, this.datanode.getDatanodeUuid()));
                    }
                }
                if (0 == 0 && this.peer == null) {
                    try {
                        LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", newShmInfo.getShmId());
                        domainSocket.shutdown();
                    } catch (IOException e) {
                        LOG.warn("Failed to shut down socket in error handler", e);
                    }
                }
                IOUtils.cleanupWithLogger((Logger) null, new Closeable[]{null});
                return;
            }
            try {
                newShmInfo = this.datanode.shortCircuitRegistry.createNewMemorySegment(str, domainSocket);
                releaseSocket();
                sendShmSuccessResponse(domainSocket, newShmInfo);
                if (ClientTraceLog.isInfoEnabled()) {
                    if (1 != 0) {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", str, Long.valueOf(newShmInfo.getShmId().getHi()), Long.valueOf(newShmInfo.getShmId().getLo()), this.datanode.getDatanodeUuid()));
                    } else {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", str, this.datanode.getDatanodeUuid()));
                    }
                }
                if (1 == 0 && this.peer == null) {
                    try {
                        LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", newShmInfo.getShmId());
                        domainSocket.shutdown();
                    } catch (IOException e2) {
                        LOG.warn("Failed to shut down socket in error handler", e2);
                    }
                }
                IOUtils.cleanupWithLogger((Logger) null, new Closeable[]{newShmInfo});
            } catch (IOException e3) {
                sendShmErrorResponse(DataTransferProtos.Status.ERROR, "Failed to create shared file descriptor: " + e3.getMessage());
                if (ClientTraceLog.isInfoEnabled()) {
                    if (0 != 0) {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", str, Long.valueOf(newShmInfo.getShmId().getHi()), Long.valueOf(newShmInfo.getShmId().getLo()), this.datanode.getDatanodeUuid()));
                    } else {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", str, this.datanode.getDatanodeUuid()));
                    }
                }
                if (0 == 0 && this.peer == null) {
                    try {
                        LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", newShmInfo.getShmId());
                        domainSocket.shutdown();
                    } catch (IOException e4) {
                        LOG.warn("Failed to shut down socket in error handler", e4);
                    }
                }
                IOUtils.cleanupWithLogger((Logger) null, new Closeable[]{newShmInfo});
            } catch (UnsupportedOperationException e5) {
                sendShmErrorResponse(DataTransferProtos.Status.ERROR_UNSUPPORTED, "This datanode has not been configured to support short-circuit shared memory segments.");
                if (ClientTraceLog.isInfoEnabled()) {
                    if (0 != 0) {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", str, Long.valueOf(newShmInfo.getShmId().getHi()), Long.valueOf(newShmInfo.getShmId().getLo()), this.datanode.getDatanodeUuid()));
                    } else {
                        BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", str, this.datanode.getDatanodeUuid()));
                    }
                }
                if (0 == 0 && this.peer == null) {
                    try {
                        LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", newShmInfo.getShmId());
                        domainSocket.shutdown();
                    } catch (IOException e6) {
                        LOG.warn("Failed to shut down socket in error handler", e6);
                    }
                }
                IOUtils.cleanupWithLogger((Logger) null, new Closeable[]{newShmInfo});
            }
        } catch (Throwable th) {
            if (ClientTraceLog.isInfoEnabled()) {
                if (0 != 0) {
                    BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", str, Long.valueOf(newShmInfo.getShmId().getHi()), Long.valueOf(newShmInfo.getShmId().getLo()), this.datanode.getDatanodeUuid()));
                } else {
                    BlockSender.ClientTraceLog.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", str, this.datanode.getDatanodeUuid()));
                }
            }
            if (0 == 0 && this.peer == null) {
                try {
                    LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", newShmInfo.getShmId());
                    domainSocket.shutdown();
                } catch (IOException e7) {
                    LOG.warn("Failed to shut down socket in error handler", e7);
                }
            }
            IOUtils.cleanupWithLogger((Logger) null, new Closeable[]{newShmInfo});
            throw th;
        }
    }

    void releaseSocket() {
        this.dataXceiverServer.releasePeer(this.peer);
        this.peer = null;
    }

    public void readBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, String str, long j, long j2, boolean z, CachingStrategy cachingStrategy) throws IOException {
        this.previousOpClientName = str;
        long j3 = 0;
        updateCurrentThreadName("Sending block " + extendedBlock);
        OutputStream outputStream = getOutputStream();
        DataOutputStream bufferedOutputStream = getBufferedOutputStream();
        checkAccess(bufferedOutputStream, true, extendedBlock, token, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
        DatanodeRegistration dNRegistrationForBP = this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId());
        try {
            try {
                try {
                    try {
                        BlockSender blockSender = new BlockSender(extendedBlock, j, j2, true, false, z, this.datanode, (str.length() <= 0 || !ClientTraceLog.isInfoEnabled()) ? dNRegistrationForBP + " Served block " + extendedBlock + " to " + this.remoteAddress : String.format(DataNode.DN_CLIENTTRACE_FORMAT, this.localAddress, this.remoteAddress, "%d", "HDFS_READ", str, "%d", dNRegistrationForBP.getDatanodeUuid(), extendedBlock, "%d"), cachingStrategy);
                        writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
                        long monotonicNow = Time.monotonicNow();
                        j3 = blockSender.sendBlock(bufferedOutputStream, outputStream, null);
                        long monotonicNow2 = Time.monotonicNow() - monotonicNow;
                        if (blockSender.didSendEntireByteRange()) {
                            try {
                                if (!DataTransferProtos.ClientReadStatusProto.parseFrom(PBHelperClient.vintPrefixed(this.in)).hasStatus()) {
                                    LOG.warn("Client {} did not send a valid status code after reading. Will close connection.", this.peer.getRemoteAddressString());
                                    IOUtils.closeStream(bufferedOutputStream);
                                }
                            } catch (IOException e) {
                                LOG.debug("Error reading client status response. Will close connection.", e);
                                IOUtils.closeStream(bufferedOutputStream);
                                incrDatanodeNetworkErrors();
                            }
                        } else {
                            IOUtils.closeStream(bufferedOutputStream);
                        }
                        this.datanode.metrics.incrBytesRead((int) j3);
                        this.datanode.metrics.incrBlocksRead();
                        this.datanode.metrics.incrTotalReadTime(monotonicNow2);
                        IOUtils.closeStream(blockSender);
                    } catch (Throwable th) {
                        IOUtils.closeStream((Closeable) null);
                        throw th;
                    }
                } catch (IOException e2) {
                    String str2 = "opReadBlock " + extendedBlock + " received exception " + e2;
                    LOG.info(str2);
                    sendResponse(DataTransferProtos.Status.ERROR, str2);
                    throw e2;
                }
            } catch (IOException e3) {
                if (!(e3 instanceof SocketTimeoutException)) {
                    LOG.warn("{}:Got exception while serving {} to {}", new Object[]{dNRegistrationForBP, extendedBlock, this.remoteAddress, e3});
                    incrDatanodeNetworkErrors();
                }
                this.datanode.handleBadBlock(extendedBlock, e3, false);
                throw e3;
            }
        } catch (SocketException e4) {
            LOG.trace("{}:Ignoring exception while serving {} to {}", new Object[]{dNRegistrationForBP, extendedBlock, this.remoteAddress, e4});
            this.datanode.metrics.incrBlocksRead();
            IOUtils.closeStream(bufferedOutputStream);
            IOUtils.closeStream((Closeable) null);
        }
        this.datanode.metrics.addReadBlockOp(elapsed());
        this.datanode.metrics.incrReadsFromClient(this.peer.isLocal(), j3);
    }

    /* JADX WARN: Removed duplicated region for block: B:108:0x05f7 A[Catch: IOException -> 0x0620, all -> 0x063a, TryCatch #1 {IOException -> 0x0620, blocks: (B:115:0x021c, B:117:0x026a, B:45:0x027d, B:47:0x0295, B:49:0x02c2, B:51:0x0316, B:52:0x0322, B:54:0x034a, B:55:0x0368, B:57:0x03bb, B:60:0x03c6, B:62:0x03cc, B:63:0x0436, B:65:0x0446, B:67:0x0466, B:70:0x0404, B:74:0x0483, B:75:0x04a3, B:77:0x04c0, B:78:0x04e6, B:79:0x04e7, B:84:0x0519, B:86:0x0521, B:87:0x0534, B:88:0x054e, B:93:0x0560, B:95:0x0580, B:99:0x0599, B:101:0x05a1, B:104:0x05b2, B:108:0x05f7, B:113:0x05ba, B:44:0x0224), top: B:114:0x021c, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:47:0x0295 A[Catch: IOException -> 0x0620, all -> 0x063a, TryCatch #1 {IOException -> 0x0620, blocks: (B:115:0x021c, B:117:0x026a, B:45:0x027d, B:47:0x0295, B:49:0x02c2, B:51:0x0316, B:52:0x0322, B:54:0x034a, B:55:0x0368, B:57:0x03bb, B:60:0x03c6, B:62:0x03cc, B:63:0x0436, B:65:0x0446, B:67:0x0466, B:70:0x0404, B:74:0x0483, B:75:0x04a3, B:77:0x04c0, B:78:0x04e6, B:79:0x04e7, B:84:0x0519, B:86:0x0521, B:87:0x0534, B:88:0x054e, B:93:0x0560, B:95:0x0580, B:99:0x0599, B:101:0x05a1, B:104:0x05b2, B:108:0x05f7, B:113:0x05ba, B:44:0x0224), top: B:114:0x021c, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:86:0x0521 A[Catch: IOException -> 0x0620, all -> 0x063a, TryCatch #1 {IOException -> 0x0620, blocks: (B:115:0x021c, B:117:0x026a, B:45:0x027d, B:47:0x0295, B:49:0x02c2, B:51:0x0316, B:52:0x0322, B:54:0x034a, B:55:0x0368, B:57:0x03bb, B:60:0x03c6, B:62:0x03cc, B:63:0x0436, B:65:0x0446, B:67:0x0466, B:70:0x0404, B:74:0x0483, B:75:0x04a3, B:77:0x04c0, B:78:0x04e6, B:79:0x04e7, B:84:0x0519, B:86:0x0521, B:87:0x0534, B:88:0x054e, B:93:0x0560, B:95:0x0580, B:99:0x0599, B:101:0x05a1, B:104:0x05b2, B:108:0x05f7, B:113:0x05ba, B:44:0x0224), top: B:114:0x021c, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:90:0x0555  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void writeBlock(org.apache.hadoop.hdfs.protocol.ExtendedBlock r25, org.apache.hadoop.fs.StorageType r26, org.apache.hadoop.security.token.Token<org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier> r27, java.lang.String r28, org.apache.hadoop.hdfs.protocol.DatanodeInfo[] r29, org.apache.hadoop.fs.StorageType[] r30, org.apache.hadoop.hdfs.protocol.DatanodeInfo r31, org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage r32, int r33, long r34, long r36, long r38, org.apache.hadoop.util.DataChecksum r40, org.apache.hadoop.hdfs.server.datanode.CachingStrategy r41, boolean r42, boolean r43, boolean[] r44, java.lang.String r45, java.lang.String[] r46) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 1667
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(org.apache.hadoop.hdfs.protocol.ExtendedBlock, org.apache.hadoop.fs.StorageType, org.apache.hadoop.security.token.Token, java.lang.String, org.apache.hadoop.hdfs.protocol.DatanodeInfo[], org.apache.hadoop.fs.StorageType[], org.apache.hadoop.hdfs.protocol.DatanodeInfo, org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage, int, long, long, long, org.apache.hadoop.util.DataChecksum, org.apache.hadoop.hdfs.server.datanode.CachingStrategy, boolean, boolean, boolean[], java.lang.String, java.lang.String[]):void");
    }

    public void transferBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, String str, DatanodeInfo[] datanodeInfoArr, StorageType[] storageTypeArr, String[] strArr) throws IOException {
        this.previousOpClientName = str;
        updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + extendedBlock);
        DataOutputStream dataOutputStream = new DataOutputStream(getOutputStream());
        checkAccess(dataOutputStream, true, extendedBlock, token, Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY, storageTypeArr, strArr);
        try {
            try {
                this.datanode.transferReplicaForPipelineRecovery(extendedBlock, datanodeInfoArr, storageTypeArr, strArr, str);
                writeResponse(DataTransferProtos.Status.SUCCESS, null, dataOutputStream);
                IOUtils.closeStream(dataOutputStream);
            } catch (IOException e) {
                LOG.info("transferBlock {} received exception {}", extendedBlock, e.toString());
                incrDatanodeNetworkErrors();
                throw e;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            throw th;
        }
    }

    public void blockChecksum(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, BlockChecksumOptions blockChecksumOptions) throws IOException {
        updateCurrentThreadName("Getting checksum for block " + extendedBlock);
        DataOutputStream dataOutputStream = new DataOutputStream(getOutputStream());
        checkAccess(dataOutputStream, true, extendedBlock, token, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
        BlockChecksumHelper.ReplicatedBlockChecksumComputer replicatedBlockChecksumComputer = new BlockChecksumHelper.ReplicatedBlockChecksumComputer(this.datanode, extendedBlock, blockChecksumOptions);
        try {
            try {
                replicatedBlockChecksumComputer.compute();
                DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(replicatedBlockChecksumComputer.getBytesPerCRC()).setCrcPerBlock(replicatedBlockChecksumComputer.getCrcPerBlock()).setBlockChecksum(ByteString.copyFrom(replicatedBlockChecksumComputer.getOutBytes())).setCrcType(PBHelperClient.convert(replicatedBlockChecksumComputer.getCrcType())).setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))).build().writeDelimitedTo(dataOutputStream);
                dataOutputStream.flush();
                IOUtils.closeStream(dataOutputStream);
                this.datanode.metrics.addBlockChecksumOp(elapsed());
            } catch (IOException e) {
                LOG.info("blockChecksum {} received exception {}", extendedBlock, e.toString());
                incrDatanodeNetworkErrors();
                throw e;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            throw th;
        }
    }

    public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token<BlockTokenIdentifier> token, long j, BlockChecksumOptions blockChecksumOptions) throws IOException {
        ExtendedBlock block = stripedBlockInfo.getBlock();
        updateCurrentThreadName("Getting checksum for block group" + block);
        DataOutputStream dataOutputStream = new DataOutputStream(getOutputStream());
        checkAccess(dataOutputStream, true, block, token, Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
        BlockChecksumHelper.BlockGroupNonStripedChecksumComputer blockGroupNonStripedChecksumComputer = new BlockChecksumHelper.BlockGroupNonStripedChecksumComputer(this.datanode, stripedBlockInfo, j, blockChecksumOptions);
        try {
            try {
                blockGroupNonStripedChecksumComputer.compute();
                DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(blockGroupNonStripedChecksumComputer.getBytesPerCRC()).setCrcPerBlock(blockGroupNonStripedChecksumComputer.getCrcPerBlock()).setBlockChecksum(ByteString.copyFrom(blockGroupNonStripedChecksumComputer.getOutBytes())).setCrcType(PBHelperClient.convert(blockGroupNonStripedChecksumComputer.getCrcType())).setBlockChecksumOptions(PBHelperClient.convert(blockChecksumOptions))).build().writeDelimitedTo(dataOutputStream);
                dataOutputStream.flush();
                IOUtils.closeStream(dataOutputStream);
                this.datanode.metrics.addBlockChecksumOp(elapsed());
            } catch (IOException e) {
                LOG.info("blockChecksum {} received exception {}", stripedBlockInfo.getBlock(), e.toString());
                incrDatanodeNetworkErrors();
                throw e;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            throw th;
        }
    }

    public void copyBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token) throws IOException {
        updateCurrentThreadName("Copying block " + extendedBlock);
        DataOutputStream bufferedOutputStream = getBufferedOutputStream();
        checkAccess(bufferedOutputStream, true, extendedBlock, token, Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
        if (this.datanode.data.getPinning(extendedBlock)) {
            String str = "Not able to copy block " + extendedBlock.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because it's pinned ";
            LOG.info(str);
            sendResponse(DataTransferProtos.Status.ERROR_BLOCK_PINNED, str);
            return;
        }
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            String str2 = "Not able to copy block " + extendedBlock.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because threads quota=" + this.dataXceiverServer.balanceThrottler.getMaxConcurrentMovers() + " is exceeded.";
            LOG.info(str2);
            sendResponse(DataTransferProtos.Status.ERROR, str2);
            return;
        }
        BlockSender blockSender = null;
        try {
            try {
                blockSender = new BlockSender(extendedBlock, 0L, -1L, false, false, true, this.datanode, null, CachingStrategy.newDropBehind());
                OutputStream outputStream = getOutputStream();
                writeSuccessWithChecksumInfo(blockSender, bufferedOutputStream);
                long monotonicNow = Time.monotonicNow();
                long sendBlock = blockSender.sendBlock(bufferedOutputStream, outputStream, this.dataXceiverServer.balanceThrottler);
                long monotonicNow2 = Time.monotonicNow() - monotonicNow;
                this.datanode.metrics.incrBytesRead((int) sendBlock);
                this.datanode.metrics.incrBlocksRead();
                this.datanode.metrics.incrTotalReadTime(monotonicNow2);
                LOG.info("Copied {} to {}", extendedBlock, this.peer.getRemoteAddressString());
                this.dataXceiverServer.balanceThrottler.release();
                if (1 != 0) {
                    try {
                        bufferedOutputStream.writeChar(100);
                    } catch (IOException e) {
                    }
                }
                IOUtils.closeStream(bufferedOutputStream);
                IOUtils.closeStream(blockSender);
                this.datanode.metrics.addCopyBlockOp(elapsed());
            } catch (Throwable th) {
                this.dataXceiverServer.balanceThrottler.release();
                if (1 != 0) {
                    try {
                        bufferedOutputStream.writeChar(100);
                    } catch (IOException e2) {
                    }
                }
                IOUtils.closeStream(bufferedOutputStream);
                IOUtils.closeStream(blockSender);
                throw th;
            }
        } catch (IOException e3) {
            LOG.info("opCopyBlock {} received exception {}", extendedBlock, e3.toString());
            incrDatanodeNetworkErrors();
            this.datanode.handleBadBlock(extendedBlock, e3, false);
            throw e3;
        }
    }

    public void replaceBlock(ExtendedBlock extendedBlock, StorageType storageType, Token<BlockTokenIdentifier> token, String str, DatanodeInfo datanodeInfo, String str2) throws IOException {
        updateCurrentThreadName("Replacing block " + extendedBlock + " from " + str);
        DataOutputStream dataOutputStream = new DataOutputStream(getOutputStream());
        checkAccess(dataOutputStream, true, extendedBlock, token, Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE, new StorageType[]{storageType}, new String[]{str2});
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            String str3 = "Not able to receive block " + extendedBlock.getBlockId() + " from " + this.peer.getRemoteAddressString() + " because threads quota=" + this.dataXceiverServer.balanceThrottler.getMaxConcurrentMovers() + " is exceeded.";
            LOG.warn(str3);
            sendResponse(DataTransferProtos.Status.ERROR, str3);
            return;
        }
        DataOutputStream dataOutputStream2 = null;
        DataTransferProtos.Status status = DataTransferProtos.Status.SUCCESS;
        DataInputStream dataInputStream = null;
        try {
            try {
                if (datanodeInfo.equals(this.datanode.getDatanodeId())) {
                    ReplicaInfo moveBlockAcrossStorage = this.datanode.data.moveBlockAcrossStorage(extendedBlock, storageType, str2);
                    if (moveBlockAcrossStorage != null) {
                        LOG.info("Moved {} from StorageType {} to {}", new Object[]{extendedBlock, moveBlockAcrossStorage.getVolume().getStorageType(), storageType});
                    }
                } else {
                    extendedBlock.setNumBytes(this.dataXceiverServer.estimateBlockSize);
                    String xferAddr = datanodeInfo.getXferAddr(this.connectToDnViaHostname);
                    LOG.debug("Connecting to datanode {}", xferAddr);
                    InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(xferAddr);
                    Socket newSocket = this.datanode.newSocket();
                    NetUtils.connect(newSocket, createSocketAddr, this.dnConf.socketTimeout);
                    newSocket.setTcpNoDelay(this.dnConf.getDataTransferServerTcpNoDelay());
                    newSocket.setSoTimeout(this.dnConf.socketTimeout);
                    newSocket.setKeepAlive(true);
                    IOStreamPair socketSend = this.datanode.saslClient.socketSend(newSocket, NetUtils.getOutputStream(newSocket, this.dnConf.socketWriteTimeout), NetUtils.getInputStream(newSocket), this.datanode.getDataEncryptionKeyFactoryForBlock(extendedBlock), token, datanodeInfo);
                    OutputStream outputStream = socketSend.out;
                    InputStream inputStream = socketSend.in;
                    dataOutputStream2 = new DataOutputStream(new BufferedOutputStream(outputStream, this.smallBufferSize));
                    dataInputStream = new DataInputStream(new BufferedInputStream(inputStream, this.ioFileBufferSize));
                    new Sender(dataOutputStream2).copyBlock(extendedBlock, token);
                    DataTransferProtos.BlockOpResponseProto parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(dataInputStream));
                    DataTransferProtoUtil.checkBlockOpStatus(parseFrom, "copy block " + extendedBlock + " from " + newSocket.getRemoteSocketAddress(), true);
                    setCurrentBlockReceiver(getBlockReceiver(extendedBlock, storageType, dataInputStream, newSocket.getRemoteSocketAddress().toString(), newSocket.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode, DataTransferProtoUtil.fromProto(parseFrom.getReadOpChecksumInfo().getChecksum()), CachingStrategy.newDropBehind(), false, false, str2));
                    this.blockReceiver.receiveBlock(null, null, dataOutputStream, null, this.dataXceiverServer.balanceThrottler, null, true);
                    Replica replica = this.blockReceiver.getReplica();
                    this.datanode.notifyNamenodeReceivedBlock(extendedBlock, str, replica.getStorageUuid(), replica.isOnTransientStorage());
                    LOG.info("Moved {} from {}, delHint={}", new Object[]{extendedBlock, this.peer.getRemoteAddressString(), str});
                }
                if (status == DataTransferProtos.Status.SUCCESS && dataInputStream != null) {
                    try {
                        dataInputStream.readChar();
                    } catch (IOException e) {
                    }
                }
                this.dataXceiverServer.balanceThrottler.release();
                try {
                    sendResponse(status, null);
                } catch (IOException e2) {
                    LOG.warn("Error writing reply back to {}", this.peer.getRemoteAddressString());
                    incrDatanodeNetworkErrors();
                }
                IOUtils.closeStream(dataOutputStream2);
                IOUtils.closeStream(this.blockReceiver);
                IOUtils.closeStream(dataInputStream);
                IOUtils.closeStream(dataOutputStream);
                this.datanode.metrics.addReplaceBlockOp(elapsed());
            } catch (Throwable th) {
                if (status == DataTransferProtos.Status.SUCCESS && 0 != 0) {
                    try {
                        dataInputStream.readChar();
                    } catch (IOException e3) {
                    }
                }
                this.dataXceiverServer.balanceThrottler.release();
                try {
                    sendResponse(status, null);
                } catch (IOException e4) {
                    LOG.warn("Error writing reply back to {}", this.peer.getRemoteAddressString());
                    incrDatanodeNetworkErrors();
                }
                IOUtils.closeStream((Closeable) null);
                IOUtils.closeStream(this.blockReceiver);
                IOUtils.closeStream((Closeable) null);
                IOUtils.closeStream(dataOutputStream);
                throw th;
            }
        } catch (IOException e5) {
            DataTransferProtos.Status status2 = DataTransferProtos.Status.ERROR;
            if (e5 instanceof BlockPinningException) {
                DataTransferProtos.Status status3 = DataTransferProtos.Status.ERROR_BLOCK_PINNED;
            }
            LOG.info("opReplaceBlock " + extendedBlock + " received exception " + e5);
            if (0 == 0) {
                incrDatanodeNetworkErrors();
            }
            throw e5;
        }
    }

    @VisibleForTesting
    BlockReceiver getBlockReceiver(ExtendedBlock extendedBlock, StorageType storageType, DataInputStream dataInputStream, String str, String str2, BlockConstructionStage blockConstructionStage, long j, long j2, long j3, String str3, DatanodeInfo datanodeInfo, DataNode dataNode, DataChecksum dataChecksum, CachingStrategy cachingStrategy, boolean z, boolean z2, String str4) throws IOException {
        return new BlockReceiver(extendedBlock, storageType, dataInputStream, str, str2, blockConstructionStage, j, j2, j3, str3, datanodeInfo, dataNode, dataChecksum, cachingStrategy, z, z2, str4);
    }

    @VisibleForTesting
    DataOutputStream getBufferedOutputStream() {
        return new DataOutputStream(new BufferedOutputStream(getOutputStream(), this.smallBufferSize));
    }

    private long elapsed() {
        return Time.monotonicNow() - this.opStartTime;
    }

    private void sendResponse(DataTransferProtos.Status status, String str) throws IOException {
        writeResponse(status, str, getOutputStream());
    }

    private static void writeResponse(DataTransferProtos.Status status, String str, OutputStream outputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder status2 = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (str != null) {
            status2.setMessage(str);
        }
        status2.build().writeDelimitedTo(outputStream);
        outputStream.flush();
    }

    private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream dataOutputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())).setChunkOffset(blockSender.getOffset()).build()).build().writeDelimitedTo(dataOutputStream);
        dataOutputStream.flush();
    }

    private void incrDatanodeNetworkErrors() {
        this.datanode.incrDatanodeNetworkErrors(this.remoteAddressWithoutPort);
    }

    void checkAndWaitForBP(ExtendedBlock extendedBlock) throws IOException {
        String blockPoolId = extendedBlock.getBlockPoolId();
        try {
            this.datanode.getDNRegistrationForBP(blockPoolId);
        } catch (IOException e) {
            long bpReadyTimeout = this.dnConf.getBpReadyTimeout();
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            while (stopWatch.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
                try {
                    this.datanode.getDNRegistrationForBP(blockPoolId);
                    return;
                } catch (IOException e2) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                        throw new IOException("Interrupted while serving request. Aborting.");
                    }
                }
            }
            throw new IOException("Not ready to serve the block pool, " + blockPoolId + ".");
        }
    }

    private void checkAccess(OutputStream outputStream, boolean z, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, Op op, BlockTokenIdentifier.AccessMode accessMode) throws IOException {
        checkAccess(outputStream, z, extendedBlock, token, op, accessMode, null, null);
    }

    private void checkAccess(OutputStream outputStream, boolean z, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, Op op, BlockTokenIdentifier.AccessMode accessMode, StorageType[] storageTypeArr, String[] strArr) throws IOException {
        checkAndWaitForBP(extendedBlock);
        if (this.datanode.isBlockTokenEnabled) {
            LOG.debug("Checking block access token for block '{}' with mode '{}'", Long.valueOf(extendedBlock.getBlockId()), accessMode);
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(token, (String) null, extendedBlock, accessMode, storageTypeArr, strArr);
            } catch (SecretManager.InvalidToken e) {
                if (z) {
                    try {
                        DataTransferProtos.BlockOpResponseProto.Builder status = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                        if (accessMode == BlockTokenIdentifier.AccessMode.WRITE) {
                            status.setFirstBadLink(this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId()).getXferAddr());
                        }
                        status.build().writeDelimitedTo(outputStream);
                        outputStream.flush();
                    } catch (Throwable th) {
                        IOUtils.closeStream(outputStream);
                        throw th;
                    }
                }
                LOG.warn("Block token verification failed: op={}, remoteAddress={}, message={}", new Object[]{op, this.remoteAddress, e.getLocalizedMessage()});
                throw e;
            }
        }
    }

    static {
        $assertionsDisabled = !DataXceiver.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
        ClientTraceLog = DataNode.ClientTraceLog;
    }
}
