/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
import org.apache.hadoop.hdfs.server.datanode.BlockSender;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;

class DataXceiver
extends Receiver
implements Runnable {
    public static final Logger LOG = DataNode.LOG;
    static final Logger CLIENT_TRACE_LOG = DataNode.CLIENT_TRACE_LOG;
    private Peer peer;
    private final String remoteAddress;
    private final String remoteAddressWithoutPort;
    private final String localAddress;
    private final DataNode datanode;
    private final DNConf dnConf;
    private final DataXceiverServer dataXceiverServer;
    private final boolean connectToDnViaHostname;
    private long opStartTime;
    private final InputStream socketIn;
    private OutputStream socketOut;
    private BlockReceiver blockReceiver = null;
    private final int ioFileBufferSize;
    private final int smallBufferSize;
    private Thread xceiver = null;
    private String previousOpClientName;

    public static DataXceiver create(Peer peer, DataNode dn, DataXceiverServer dataXceiverServer) throws IOException {
        return new DataXceiver(peer, dn, dataXceiverServer);
    }

    private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException {
        super(FsTracer.get(null));
        this.peer = peer;
        this.dnConf = datanode.getDnConf();
        this.socketIn = peer.getInputStream();
        this.socketOut = peer.getOutputStream();
        this.datanode = datanode;
        this.dataXceiverServer = dataXceiverServer;
        this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
        this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize((Configuration)datanode.getConf());
        this.smallBufferSize = DFSUtilClient.getSmallBufferSize((Configuration)datanode.getConf());
        this.remoteAddress = peer.getRemoteAddressString();
        int colonIdx = this.remoteAddress.indexOf(58);
        this.remoteAddressWithoutPort = colonIdx < 0 ? this.remoteAddress : this.remoteAddress.substring(0, colonIdx);
        this.localAddress = peer.getLocalAddressString();
        LOG.debug("Number of active connections is: {}", (Object)datanode.getXceiverCount());
    }

    private void updateCurrentThreadName(String status) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ");
        if (this.previousOpClientName != null) {
            sb.append(this.previousOpClientName).append(" at ");
        }
        sb.append(this.remoteAddress);
        if (status != null) {
            sb.append(" [").append(status).append("]");
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    private OutputStream getOutputStream() {
        return this.socketOut;
    }

    public void sendOOB() throws IOException, InterruptedException {
        BlockReceiver br = this.getCurrentBlockReceiver();
        if (br == null) {
            return;
        }
        LOG.info("Sending OOB to peer: {}", (Object)this.peer);
        br.sendOOB();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopWriter() {
        DataXceiver dataXceiver = this;
        synchronized (dataXceiver) {
            if (this.getCurrentBlockReceiver() == null) {
                return;
            }
            this.xceiver.interrupt();
        }
        LOG.info("Stopped the writer: {}", (Object)this.peer);
    }

    private synchronized void setCurrentBlockReceiver(BlockReceiver br) {
        this.blockReceiver = br;
    }

    private synchronized BlockReceiver getCurrentBlockReceiver() {
        return this.blockReceiver;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int opsProcessed = 0;
        Op op = null;
        try {
            DataXceiver dataXceiver = this;
            synchronized (dataXceiver) {
                this.xceiver = Thread.currentThread();
            }
            this.dataXceiverServer.addPeer(this.peer, Thread.currentThread(), this);
            this.peer.setWriteTimeout(this.datanode.getDnConf().socketWriteTimeout);
            InputStream input = this.socketIn;
            try {
                IOStreamPair saslStreams = this.datanode.saslServer.receive(this.peer, this.socketOut, this.socketIn, this.datanode.getXferAddress().getPort(), this.datanode.getDatanodeId());
                input = new BufferedInputStream(saslStreams.in, this.smallBufferSize);
                this.socketOut = saslStreams.out;
            }
            catch (InvalidMagicNumberException imne) {
                if (imne.isHandshake4Encryption()) {
                    LOG.info("Failed to read expected encryption handshake from client at {}. Perhaps the client is running an older version of Hadoop which does not support encryption", (Object)this.peer.getRemoteAddressString(), (Object)imne);
                } else {
                    LOG.info("Failed to read expected SASL data transfer protection handshake from client at {}. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection", (Object)this.peer.getRemoteAddressString(), (Object)imne);
                }
                this.collectThreadLocalStates();
                LOG.debug("{}:Number of active connections is: {}", (Object)this.datanode.getDisplayName(), (Object)this.datanode.getXceiverCount());
                this.updateCurrentThreadName("Cleaning up");
                if (this.peer != null) {
                    this.dataXceiverServer.closePeer(this.peer);
                    IOUtils.closeStream((Closeable)this.in);
                }
                return;
            }
            super.initialize(new DataInputStream(input));
            do {
                this.updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
                try {
                    if (opsProcessed != 0) {
                        assert (this.dnConf.socketKeepaliveTimeout > 0);
                        this.peer.setReadTimeout(this.dnConf.socketKeepaliveTimeout);
                    } else {
                        this.peer.setReadTimeout(this.dnConf.socketTimeout);
                    }
                    op = this.readOp();
                }
                catch (InterruptedIOException ignored) {
                    break;
                }
                catch (EOFException | ClosedChannelException e) {
                    LOG.debug("Cached {} closing after {} ops.  This message is usually benign.", (Object)this.peer, (Object)opsProcessed);
                    break;
                }
                catch (IOException err) {
                    this.incrDatanodeNetworkErrors();
                    throw err;
                }
                if (opsProcessed != 0) {
                    this.peer.setReadTimeout(this.dnConf.socketTimeout);
                }
                this.opStartTime = Time.monotonicNow();
                this.processOp(op);
                ++opsProcessed;
            } while (this.peer != null && !this.peer.isClosed() && this.dnConf.socketKeepaliveTimeout > 0);
        }
        catch (Throwable t) {
            String s = this.datanode.getDisplayName() + ":DataXceiver error processing " + (op == null ? "unknown" : op.name()) + " operation  src: " + this.remoteAddress + " dst: " + this.localAddress;
            if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace(s, t);
                } else {
                    LOG.info("{}; {}", (Object)s, (Object)t.toString());
                }
            } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
                String s1 = "Likely the client has stopped reading, disconnecting it";
                s1 = s1 + " (" + s + ")";
                if (LOG.isTraceEnabled()) {
                    LOG.trace(s1, t);
                } else {
                    LOG.info("{}; {}", (Object)s1, (Object)t.toString());
                }
            } else if (t instanceof SecretManager.InvalidToken || t.getCause() instanceof SecretManager.InvalidToken) {
                LOG.trace(s, t);
            } else {
                LOG.error(s, t);
            }
        }
        finally {
            this.collectThreadLocalStates();
            LOG.debug("{}:Number of active connections is: {}", (Object)this.datanode.getDisplayName(), (Object)this.datanode.getXceiverCount());
            this.updateCurrentThreadName("Cleaning up");
            if (this.peer != null) {
                this.dataXceiverServer.closePeer(this.peer);
                IOUtils.closeStream((Closeable)this.in);
            }
        }
    }

    private void collectThreadLocalStates() {
        if (this.datanode.getDnConf().peerStatsEnabled && this.datanode.getPeerMetrics() != null) {
            this.datanode.getPeerMetrics().collectThreadLocalStates();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestShortCircuitFds(ExtendedBlock blk, Token<BlockTokenIdentifier> token, ShortCircuitShm.SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException {
        boolean success;
        Closeable[] fis;
        block18: {
            this.updateCurrentThreadName("Passing file descriptors for block " + blk);
            DataOutputStream out = this.getBufferedOutputStream();
            this.checkAccess(out, true, blk, token, Op.REQUEST_SHORT_CIRCUIT_FDS, BlockTokenIdentifier.AccessMode.READ, null, null);
            DataTransferProtos.BlockOpResponseProto.Builder bld = DataTransferProtos.BlockOpResponseProto.newBuilder();
            fis = null;
            ShortCircuitShm.SlotId registeredSlotId = null;
            success = false;
            try {
                try {
                    if (this.peer.getDomainSocket() == null) {
                        throw new IOException("You cannot pass file descriptors over anything but a UNIX domain socket.");
                    }
                    if (slotId != null) {
                        boolean isCached = this.datanode.data.isCached(blk.getBlockPoolId(), blk.getBlockId());
                        this.datanode.shortCircuitRegistry.registerSlot(ExtendedBlockId.fromExtendedBlock((ExtendedBlock)blk), slotId, isCached);
                        registeredSlotId = slotId;
                    }
                    Preconditions.checkState(((fis = this.datanode.requestShortCircuitFdsForRead(blk, token, maxVersion)) != null ? 1 : 0) != 0);
                    bld.setStatus(DataTransferProtos.Status.SUCCESS);
                    bld.setShortCircuitAccessVersion(1);
                }
                catch (DataNode.ShortCircuitFdsVersionException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                    bld.setShortCircuitAccessVersion(1);
                    bld.setMessage(e.getMessage());
                }
                catch (DataNode.ShortCircuitFdsUnsupportedException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR_UNSUPPORTED);
                    bld.setMessage(e.getMessage());
                }
                catch (IOException e) {
                    bld.setStatus(DataTransferProtos.Status.ERROR);
                    bld.setMessage(e.getMessage());
                    LOG.error("Request short-circuit read file descriptor failed with unknown error.", (Throwable)e);
                }
                bld.build().writeDelimitedTo(this.socketOut);
                if (fis != null) {
                    FileDescriptor[] fds = new FileDescriptor[fis.length];
                    for (int i = 0; i < fds.length; ++i) {
                        fds[i] = ((FileInputStream)fis[i]).getFD();
                    }
                    byte[] buf = new byte[]{supportsReceiptVerification ? (byte)DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION.getNumber() : (byte)DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION.getNumber()};
                    DomainSocket sock = this.peer.getDomainSocket();
                    sock.sendFileDescriptors(fds, buf, 0, buf.length);
                    if (supportsReceiptVerification) {
                        LOG.trace("Reading receipt verification byte for {}", (Object)slotId);
                        int val = sock.getInputStream().read();
                        if (val < 0) {
                            throw new EOFException();
                        }
                    } else {
                        LOG.trace("Receipt verification is not enabled on the DataNode. Not verifying {}", (Object)slotId);
                    }
                    success = true;
                }
                if (success || registeredSlotId == null) break block18;
            }
            catch (Throwable throwable) {
                if (!success && registeredSlotId != null) {
                    LOG.info("Unregistering {} because the requestShortCircuitFdsForRead operation failed.", registeredSlotId);
                    this.datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
                }
                if (CLIENT_TRACE_LOG.isInfoEnabled()) {
                    DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
                    BlockSender.CLIENT_TRACE_LOG.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", blk.getBlockId(), dnR.getDatanodeUuid(), success));
                }
                if (fis != null) {
                    IOUtils.cleanupWithLogger(null, fis);
                }
                throw throwable;
            }
            LOG.info("Unregistering {} because the requestShortCircuitFdsForRead operation failed.", (Object)registeredSlotId);
            this.datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
        }
        if (CLIENT_TRACE_LOG.isInfoEnabled()) {
            DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
            BlockSender.CLIENT_TRACE_LOG.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS, blockid: %s, srvID: %s, success: %b", blk.getBlockId(), dnR.getDatanodeUuid(), success));
        }
        if (fis != null) {
            IOUtils.cleanupWithLogger(null, (Closeable[])fis);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseShortCircuitFds(ShortCircuitShm.SlotId slotId) throws IOException {
        boolean success = false;
        try {
            DataTransferProtos.Status status;
            String error;
            try {
                this.datanode.shortCircuitRegistry.unregisterSlot(slotId);
                error = null;
                status = DataTransferProtos.Status.SUCCESS;
            }
            catch (UnsupportedOperationException e) {
                error = "unsupported operation";
                status = DataTransferProtos.Status.ERROR_UNSUPPORTED;
            }
            catch (Throwable e) {
                error = e.getMessage();
                status = DataTransferProtos.Status.ERROR_INVALID;
            }
            DataTransferProtos.ReleaseShortCircuitAccessResponseProto.Builder bld = DataTransferProtos.ReleaseShortCircuitAccessResponseProto.newBuilder();
            bld.setStatus(status);
            if (error != null) {
                bld.setError(error);
            }
            bld.build().writeDelimitedTo(this.socketOut);
            success = true;
        }
        catch (Throwable throwable) {
            if (CLIENT_TRACE_LOG.isInfoEnabled()) {
                BlockSender.CLIENT_TRACE_LOG.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", slotId.getShmId().getHi(), slotId.getShmId().getLo(), slotId.getSlotIdx(), this.datanode.getDatanodeUuid(), success));
            }
            throw throwable;
        }
        if (CLIENT_TRACE_LOG.isInfoEnabled()) {
            BlockSender.CLIENT_TRACE_LOG.info(String.format("src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS, shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", slotId.getShmId().getHi(), slotId.getShmId().getLo(), slotId.getSlotIdx(), this.datanode.getDatanodeUuid(), success));
        }
    }

    private void sendShmErrorResponse(DataTransferProtos.Status status, String error) throws IOException {
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(status).setError(error).build().writeDelimitedTo(this.socketOut);
    }

    private void sendShmSuccessResponse(DomainSocket sock, ShortCircuitRegistry.NewShmInfo shmInfo) throws IOException {
        DataNodeFaultInjector.get().sendShortCircuitShmResponse();
        DataTransferProtos.ShortCircuitShmResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setId(PBHelperClient.convert((ShortCircuitShm.ShmId)shmInfo.getShmId())).build().writeDelimitedTo(this.socketOut);
        byte[] buf = new byte[]{0};
        FileDescriptor[] shmFdArray = new FileDescriptor[]{shmInfo.getFileStream().getFD()};
        sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void requestShortCircuitShm(String clientName) throws IOException {
        shmInfo = null;
        success = false;
        sock = this.peer.getDomainSocket();
        if (sock == null) {
            this.sendShmErrorResponse(DataTransferProtos.Status.ERROR_INVALID, "Bad request from " + this.peer + ": must request a shared memory segment over a UNIX domain socket.");
        }
        ** GOTO lbl38
        {
            catch (Throwable var7_11) {
                if (DataXceiver.CLIENT_TRACE_LOG.isInfoEnabled()) {
                    if (success) {
                        BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.getShmId().getHi(), shmInfo.getShmId().getLo(), this.datanode.getDatanodeUuid()}));
                    } else {
                        BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                    }
                }
                if (!success && this.peer == null) {
                    try {
                        DataXceiver.LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", (Object)shmInfo.getShmId());
                        sock.shutdown();
                    }
                    catch (IOException e) {
                        DataXceiver.LOG.warn("Failed to shut down socket in error handler", (Throwable)e);
                    }
                }
                IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{shmInfo});
                throw var7_11;
            }
            if (DataXceiver.CLIENT_TRACE_LOG.isInfoEnabled()) {
                if (success) {
                    BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.getShmId().getHi(), shmInfo.getShmId().getLo(), this.datanode.getDatanodeUuid()}));
                } else {
                    BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                }
            }
            if (!success && this.peer == null) {
                try {
                    DataXceiver.LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", (Object)shmInfo.getShmId());
                    sock.shutdown();
                }
                catch (IOException e) {
                    DataXceiver.LOG.warn("Failed to shut down socket in error handler", (Throwable)e);
                }
            }
            IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{shmInfo});
            return;
lbl38:
            // 1 sources

            try {
                shmInfo = this.datanode.shortCircuitRegistry.createNewMemorySegment(clientName, sock);
                this.releaseSocket();
            }
            catch (UnsupportedOperationException e) {
                this.sendShmErrorResponse(DataTransferProtos.Status.ERROR_UNSUPPORTED, "This datanode has not been configured to support short-circuit shared memory segments.");
                if (DataXceiver.CLIENT_TRACE_LOG.isInfoEnabled()) {
                    if (success) {
                        BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.getShmId().getHi(), shmInfo.getShmId().getLo(), this.datanode.getDatanodeUuid()}));
                    } else {
                        BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                    }
                }
                if (!success && this.peer == null) {
                    try {
                        DataXceiver.LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", (Object)shmInfo.getShmId());
                        sock.shutdown();
                    }
                    catch (IOException e) {
                        DataXceiver.LOG.warn("Failed to shut down socket in error handler", (Throwable)e);
                    }
                }
                IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{shmInfo});
                return;
            }
            catch (IOException e) {}
            {
                this.sendShmErrorResponse(DataTransferProtos.Status.ERROR, "Failed to create shared file descriptor: " + e.getMessage());
            }
            if (DataXceiver.CLIENT_TRACE_LOG.isInfoEnabled()) {
                if (success) {
                    BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.getShmId().getHi(), shmInfo.getShmId().getLo(), this.datanode.getDatanodeUuid()}));
                } else {
                    BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
                }
            }
            if (!success && this.peer == null) {
                try {
                    DataXceiver.LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", (Object)shmInfo.getShmId());
                    sock.shutdown();
                }
                catch (IOException e) {
                    DataXceiver.LOG.warn("Failed to shut down socket in error handler", (Throwable)e);
                }
            }
            IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{shmInfo});
            return;
        }
        {
            this.sendShmSuccessResponse(sock, shmInfo);
            success = true;
        }
        if (DataXceiver.CLIENT_TRACE_LOG.isInfoEnabled()) {
            if (success) {
                BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: %016x%016x, srvID: %s, success: true", new Object[]{clientName, shmInfo.getShmId().getHi(), shmInfo.getShmId().getLo(), this.datanode.getDatanodeUuid()}));
            } else {
                BlockSender.CLIENT_TRACE_LOG.info(String.format("cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: %s, success: false", new Object[]{clientName, this.datanode.getDatanodeUuid()}));
            }
        }
        if (!success && this.peer == null) {
            try {
                DataXceiver.LOG.warn("Failed to send success response back to the client. Shutting down socket for {}", (Object)shmInfo.getShmId());
                sock.shutdown();
            }
            catch (IOException e) {
                DataXceiver.LOG.warn("Failed to shut down socket in error handler", (Throwable)e);
            }
        }
        IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{shmInfo});
    }

    void releaseSocket() {
        this.dataXceiverServer.releasePeer(this.peer);
        this.peer = null;
    }

    /*
     * Loose catch block
     */
    public void readBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, String clientName, long blockOffset, long length, boolean sendChecksum, CachingStrategy cachingStrategy) throws IOException {
        this.previousOpClientName = clientName;
        long read = 0L;
        this.updateCurrentThreadName("Sending block " + block);
        OutputStream baseStream = this.getOutputStream();
        DataOutputStream out = this.getBufferedOutputStream();
        this.checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
        BlockSender blockSender = null;
        DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(block.getBlockPoolId());
        String clientTraceFmt = clientName.length() > 0 && CLIENT_TRACE_LOG.isInfoEnabled() ? String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration(ns): %s", this.localAddress, this.remoteAddress, "", "%d", "HDFS_READ", clientName, "%d", dnR.getDatanodeUuid(), block, "%d") : dnR + " Served block " + block + " to " + this.remoteAddress;
        try {
            try {
                blockSender = new BlockSender(block, blockOffset, length, true, false, sendChecksum, this.datanode, clientTraceFmt, cachingStrategy);
            }
            catch (IOException e) {
                String msg = "opReadBlock " + block + " received exception " + e;
                LOG.info(msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                throw e;
            }
            this.writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(this.getOutputStream()));
            long beginRead = Time.monotonicNow();
            read = blockSender.sendBlock(out, baseStream, null);
            long duration = Time.monotonicNow() - beginRead;
            if (blockSender.didSendEntireByteRange()) {
                try {
                    DataTransferProtos.ClientReadStatusProto stat = DataTransferProtos.ClientReadStatusProto.parseFrom((InputStream)PBHelperClient.vintPrefixed((InputStream)this.in));
                    if (!stat.hasStatus()) {
                        LOG.warn("Client {} did not send a valid status code after reading. Will close connection.", (Object)this.peer.getRemoteAddressString());
                        IOUtils.closeStream((Closeable)out);
                    }
                }
                catch (IOException ioe) {
                    LOG.debug("Error reading client status response. Will close connection.", (Throwable)ioe);
                    IOUtils.closeStream((Closeable)out);
                    this.incrDatanodeNetworkErrors();
                }
            } else {
                IOUtils.closeStream((Closeable)out);
            }
            this.datanode.metrics.incrBytesRead((int)read);
            this.datanode.metrics.incrBlocksRead();
            this.datanode.metrics.incrTotalReadTime(duration);
        }
        catch (SocketException ignored) {
            LOG.trace("{}:Ignoring exception while serving {} to {}", new Object[]{dnR, block, this.remoteAddress, ignored});
            this.datanode.metrics.incrBlocksRead();
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeStream((Closeable)blockSender);
        }
        catch (IOException ioe) {
            if (!(ioe instanceof SocketTimeoutException)) {
                LOG.warn("{}:Got exception while serving {} to {}", new Object[]{dnR, block, this.remoteAddress, ioe});
                this.incrDatanodeNetworkErrors();
            }
            this.datanode.handleBadBlock(block, ioe, false);
            throw ioe;
            {
                catch (Throwable throwable) {
                    IOUtils.closeStream(blockSender);
                    throw throwable;
                }
            }
        }
        IOUtils.closeStream((Closeable)blockSender);
        this.datanode.metrics.addReadBlockOp(this.elapsed());
        this.datanode.metrics.incrReadsFromClient(this.peer.isLocal(), read);
    }

    public void writeBlock(ExtendedBlock block, StorageType storageType, Token<BlockTokenIdentifier> blockToken, String clientname, DatanodeInfo[] targets, StorageType[] targetStorageTypes, DatanodeInfo srcDataNode, BlockConstructionStage stage, int pipelineSize, long minBytesRcvd, long maxBytesRcvd, long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, boolean allowLazyPersist, boolean pinning, boolean[] targetPinnings, String storageId, String[] targetStorageIds) throws IOException {
        String[] storageIds;
        int nsi;
        this.previousOpClientName = clientname;
        this.updateCurrentThreadName("Receiving block " + block);
        boolean isDatanode = clientname.length() == 0;
        boolean isClient = !isDatanode;
        boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
        allowLazyPersist = allowLazyPersist && (this.dnConf.getAllowNonLocalLazyPersist() || this.peer.isLocal());
        long size = 0L;
        DataOutputStream replyOut = this.getBufferedOutputStream();
        int nst = targetStorageTypes.length;
        StorageType[] storageTypes = new StorageType[nst + 1];
        storageTypes[0] = storageType;
        if (targetStorageTypes.length > 0) {
            System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
        }
        if ((nsi = targetStorageIds.length) > 0) {
            storageIds = new String[nsi + 1];
            storageIds[0] = storageId;
            if (targetStorageTypes.length > 0) {
                System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
            }
        } else {
            storageIds = new String[]{};
        }
        this.checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE, storageTypes, storageIds);
        if (isTransfer && targets.length > 0) {
            throw new IOException(stage + " does not support multiple targets " + Arrays.asList(targets));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("opWriteBlock: stage={}, clientname={}\n  block  ={}, newGs={}, bytesRcvd=[{}, {}]\n  targets={}; pipelineSize={}, srcDataNode={}, pinning={}", new Object[]{stage, clientname, block, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, Arrays.asList(targets), pipelineSize, srcDataNode, pinning});
            LOG.debug("isDatanode={}, isClient={}, isTransfer={}", new Object[]{isDatanode, isClient, isTransfer});
            LOG.debug("writeBlock receive buf size {} tcp no delay {}", (Object)this.peer.getReceiveBufferSize(), (Object)this.peer.getTcpNoDelay());
        }
        ExtendedBlock originalBlock = new ExtendedBlock(block);
        if (block.getNumBytes() == 0L) {
            block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        }
        LOG.info("Receiving {} src: {} dest: {}", new Object[]{block, this.remoteAddress, this.localAddress});
        DataOutputStream mirrorOut = null;
        DataInputStream mirrorIn = null;
        Socket mirrorSock = null;
        String mirrorNode = null;
        String firstBadLink = "";
        DataTransferProtos.Status mirrorInStatus = DataTransferProtos.Status.SUCCESS;
        try {
            Replica replica;
            if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.setCurrentBlockReceiver(this.getBlockReceiver(block, storageType, this.in, this.peer.getRemoteAddressString(), this.peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, this.datanode, requestedChecksum, cachingStrategy, allowLazyPersist, pinning, storageId));
                replica = this.blockReceiver.getReplica();
            } else {
                replica = this.datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
            }
            String storageUuid = replica.getStorageUuid();
            boolean isOnTransientStorage = replica.isOnTransientStorage();
            if (targets.length > 0) {
                InetSocketAddress mirrorTarget = null;
                mirrorNode = targets[0].getXferAddr(this.connectToDnViaHostname);
                LOG.debug("Connecting to datanode {}", (Object)mirrorNode);
                mirrorTarget = NetUtils.createSocketAddr((String)mirrorNode);
                mirrorSock = this.datanode.newSocket();
                try {
                    DataNodeFaultInjector.get().failMirrorConnection();
                    int timeoutValue = this.dnConf.socketTimeout + 5000 * targets.length;
                    int writeTimeout = this.dnConf.socketWriteTimeout + 5000 * targets.length;
                    NetUtils.connect((Socket)mirrorSock, (SocketAddress)mirrorTarget, (int)timeoutValue);
                    mirrorSock.setTcpNoDelay(this.dnConf.getDataTransferServerTcpNoDelay());
                    mirrorSock.setSoTimeout(timeoutValue);
                    mirrorSock.setKeepAlive(true);
                    if (this.dnConf.getTransferSocketSendBufferSize() > 0) {
                        mirrorSock.setSendBufferSize(this.dnConf.getTransferSocketSendBufferSize());
                    }
                    OutputStream unbufMirrorOut = NetUtils.getOutputStream((Socket)mirrorSock, (long)writeTimeout);
                    Object unbufMirrorIn = NetUtils.getInputStream((Socket)mirrorSock);
                    DataEncryptionKeyFactory keyFactory = this.datanode.getDataEncryptionKeyFactoryForBlock(block);
                    SecretKey secretKey = null;
                    if (this.dnConf.overwriteDownstreamDerivedQOP) {
                        String bpid = block.getBlockPoolId();
                        BlockKey blockKey = this.datanode.blockPoolTokenSecretManager.get(bpid).getCurrentKey();
                        secretKey = blockKey.getKey();
                    }
                    IOStreamPair saslStreams = this.datanode.saslClient.socketSend(mirrorSock, unbufMirrorOut, (InputStream)unbufMirrorIn, keyFactory, blockToken, (DatanodeID)targets[0], secretKey);
                    unbufMirrorOut = saslStreams.out;
                    unbufMirrorIn = saslStreams.in;
                    mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, this.smallBufferSize));
                    mirrorIn = new DataInputStream((InputStream)unbufMirrorIn);
                    String targetStorageId = null;
                    if (targetStorageIds.length > 0) {
                        targetStorageId = targetStorageIds[0];
                    }
                    if (targetPinnings != null && targetPinnings.length > 0) {
                        new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, allowLazyPersist, targetPinnings[0], targetPinnings, targetStorageId, targetStorageIds);
                    } else {
                        new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, allowLazyPersist, false, targetPinnings, targetStorageId, targetStorageIds);
                    }
                    mirrorOut.flush();
                    DataNodeFaultInjector.get().writeBlockAfterFlush();
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto connectAck = DataTransferProtos.BlockOpResponseProto.parseFrom((InputStream)PBHelperClient.vintPrefixed((InputStream)mirrorIn));
                        mirrorInStatus = connectAck.getStatus();
                        firstBadLink = connectAck.getFirstBadLink();
                        if (mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                            LOG.debug("Datanode {} got response for connectack  from downstream datanode with firstbadlink as {}", (Object)targets.length, (Object)firstBadLink);
                        }
                    }
                }
                catch (IOException e) {
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR).setFirstBadLink(targets[0].getXferAddr()).build().writeDelimitedTo((OutputStream)replyOut);
                        replyOut.flush();
                    }
                    IOUtils.closeStream(mirrorOut);
                    mirrorOut = null;
                    IOUtils.closeStream(mirrorIn);
                    mirrorIn = null;
                    IOUtils.closeSocket((Socket)mirrorSock);
                    mirrorSock = null;
                    if (isClient) {
                        LOG.error("{}:Exception transfering block {} to mirror {}", new Object[]{this.datanode, block, mirrorNode, e});
                        throw e;
                    }
                    LOG.info("{}:Exception transfering {} to mirror {}- continuing without the mirror", new Object[]{this.datanode, block, mirrorNode, e});
                    this.incrDatanodeNetworkErrors();
                }
            }
            if (isClient && !isTransfer) {
                if (mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                    LOG.debug("Datanode {} forwarding connect ack to upstream firstbadlink is {}", (Object)targets.length, (Object)firstBadLink);
                }
                DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(mirrorInStatus).setFirstBadLink(firstBadLink).build().writeDelimitedTo((OutputStream)replyOut);
                replyOut.flush();
            }
            if (this.blockReceiver != null) {
                String mirrorAddr = mirrorSock == null ? null : mirrorNode;
                this.blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, this.dataXceiverServer.getWriteThrottler(), targets, false);
                if (isTransfer) {
                    LOG.trace("TRANSFER: send close-ack");
                    DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, replyOut);
                }
            }
            if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                block.setGenerationStamp(latestGenerationStamp);
                block.setNumBytes(minBytesRcvd);
            }
            if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
                LOG.info("Received {} src: {} dest: {} of size {}", new Object[]{block, this.remoteAddress, this.localAddress, block.getNumBytes()});
            }
            if (isClient) {
                size = block.getNumBytes();
            }
        }
        catch (IOException ioe) {
            try {
                LOG.info("opWriteBlock {} received exception {}", (Object)block, (Object)ioe.toString());
                this.incrDatanodeNetworkErrors();
                throw ioe;
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(mirrorOut);
                IOUtils.closeStream(mirrorIn);
                IOUtils.closeStream((Closeable)replyOut);
                IOUtils.closeSocket(mirrorSock);
                IOUtils.closeStream((Closeable)this.blockReceiver);
                this.setCurrentBlockReceiver(null);
                throw throwable;
            }
        }
        IOUtils.closeStream(mirrorOut);
        IOUtils.closeStream(mirrorIn);
        IOUtils.closeStream((Closeable)replyOut);
        IOUtils.closeSocket((Socket)mirrorSock);
        IOUtils.closeStream((Closeable)this.blockReceiver);
        this.setCurrentBlockReceiver(null);
        this.datanode.getMetrics().addWriteBlockOp(this.elapsed());
        this.datanode.getMetrics().incrWritesFromClient(this.peer.isLocal(), size);
    }

    public void transferBlock(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken, String clientName, DatanodeInfo[] targets, StorageType[] targetStorageTypes, String[] targetStorageIds) throws IOException {
        this.previousOpClientName = clientName;
        this.updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
        DataOutputStream out = new DataOutputStream(this.getOutputStream());
        this.checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes, targetStorageIds);
        try {
            this.datanode.transferReplicaForPipelineRecovery(blk, targets, targetStorageTypes, targetStorageIds, clientName);
            DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, out);
        }
        catch (IOException ioe) {
            LOG.info("transferBlock {} received exception {}", (Object)blk, (Object)ioe.toString());
            this.incrDatanodeNetworkErrors();
            throw ioe;
        }
        finally {
            IOUtils.closeStream((Closeable)out);
        }
    }

    public void blockChecksum(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, BlockChecksumOptions blockChecksumOptions) throws IOException {
        this.updateCurrentThreadName("Getting checksum for block " + block);
        DataOutputStream out = new DataOutputStream(this.getOutputStream());
        this.checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
        BlockChecksumHelper.ReplicatedBlockChecksumComputer maker = new BlockChecksumHelper.ReplicatedBlockChecksumComputer(this.datanode, block, blockChecksumOptions);
        try {
            ((BlockChecksumHelper.BlockChecksumComputer)maker).compute();
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(maker.getBytesPerCRC()).setCrcPerBlock(maker.getCrcPerBlock()).setBlockChecksum(ByteString.copyFrom((byte[])maker.getOutBytes())).setCrcType(PBHelperClient.convert((DataChecksum.Type)maker.getCrcType())).setBlockChecksumOptions(PBHelperClient.convert((BlockChecksumOptions)blockChecksumOptions))).build().writeDelimitedTo((OutputStream)out);
            out.flush();
        }
        catch (IOException ioe) {
            LOG.info("blockChecksum {} received exception {}", (Object)block, (Object)ioe.toString());
            this.incrDatanodeNetworkErrors();
            throw ioe;
        }
        finally {
            IOUtils.closeStream((Closeable)out);
        }
        this.datanode.metrics.addBlockChecksumOp(this.elapsed());
    }

    public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token<BlockTokenIdentifier> blockToken, long requestedNumBytes, BlockChecksumOptions blockChecksumOptions) throws IOException {
        ExtendedBlock block = stripedBlockInfo.getBlock();
        this.updateCurrentThreadName("Getting checksum for block group" + block);
        DataOutputStream out = new DataOutputStream(this.getOutputStream());
        this.checkAccess(out, true, block, blockToken, Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
        BlockChecksumHelper.BlockGroupNonStripedChecksumComputer maker = new BlockChecksumHelper.BlockGroupNonStripedChecksumComputer(this.datanode, stripedBlockInfo, requestedNumBytes, blockChecksumOptions);
        try {
            ((BlockChecksumHelper.AbstractBlockChecksumComputer)maker).compute();
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(maker.getBytesPerCRC()).setCrcPerBlock(maker.getCrcPerBlock()).setBlockChecksum(ByteString.copyFrom((byte[])maker.getOutBytes())).setCrcType(PBHelperClient.convert((DataChecksum.Type)maker.getCrcType())).setBlockChecksumOptions(PBHelperClient.convert((BlockChecksumOptions)blockChecksumOptions))).build().writeDelimitedTo((OutputStream)out);
            out.flush();
        }
        catch (IOException ioe) {
            LOG.info("blockChecksum {} received exception {}", (Object)stripedBlockInfo.getBlock(), (Object)ioe.toString());
            this.incrDatanodeNetworkErrors();
            throw ioe;
        }
        finally {
            IOUtils.closeStream((Closeable)out);
        }
        this.datanode.metrics.addBlockChecksumOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void copyBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken) throws IOException {
        BlockSender blockSender;
        DataOutputStream reply;
        block11: {
            this.updateCurrentThreadName("Copying block " + block);
            reply = this.getBufferedOutputStream();
            this.checkAccess(reply, true, block, blockToken, Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
            if (this.datanode.data.getPinning(block)) {
                String msg = "Not able to copy block " + block.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because it's pinned ";
                LOG.info(msg);
                this.sendResponse(DataTransferProtos.Status.ERROR_BLOCK_PINNED, msg);
                return;
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                String msg = "Not able to copy block " + block.getBlockId() + " to " + this.peer.getRemoteAddressString() + " because threads quota=" + this.dataXceiverServer.balanceThrottler.getMaxConcurrentMovers() + " is exceeded.";
                LOG.info(msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                return;
            }
            blockSender = null;
            boolean isOpSuccess = true;
            try {
                blockSender = new BlockSender(block, 0L, -1L, false, false, true, this.datanode, null, CachingStrategy.newDropBehind());
                OutputStream baseStream2332 = this.getOutputStream();
                this.writeSuccessWithChecksumInfo(blockSender, reply);
                long beginRead = Time.monotonicNow();
                long read = blockSender.sendBlock(reply, baseStream2332, this.dataXceiverServer.balanceThrottler);
                long duration = Time.monotonicNow() - beginRead;
                this.datanode.metrics.incrBytesRead((int)read);
                this.datanode.metrics.incrBlocksRead();
                this.datanode.metrics.incrTotalReadTime(duration);
                LOG.info("Copied {} to {}", (Object)block, (Object)this.peer.getRemoteAddressString());
                this.dataXceiverServer.balanceThrottler.release();
                if (!isOpSuccess) break block11;
            }
            catch (IOException ioe) {
                try {
                    isOpSuccess = false;
                    LOG.info("opCopyBlock {} received exception {}", (Object)block, (Object)ioe.toString());
                    this.incrDatanodeNetworkErrors();
                    this.datanode.handleBadBlock(block, ioe, false);
                    throw ioe;
                }
                catch (Throwable throwable) {
                    this.dataXceiverServer.balanceThrottler.release();
                    if (isOpSuccess) {
                        try {
                            reply.writeChar(100);
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    IOUtils.closeStream((Closeable)reply);
                    IOUtils.closeStream(blockSender);
                    throw throwable;
                }
            }
            try {
                reply.writeChar(100);
            }
            catch (IOException baseStream2332) {
                // empty catch block
            }
        }
        IOUtils.closeStream((Closeable)reply);
        IOUtils.closeStream((Closeable)blockSender);
        this.datanode.metrics.addCopyBlockOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void replaceBlock(ExtendedBlock block, StorageType storageType, Token<BlockTokenIdentifier> blockToken, String delHint, DatanodeInfo proxySource, String storageId) throws IOException {
        DataInputStream proxyReply;
        String errMsg;
        DataTransferProtos.Status opStatus;
        DataOutputStream proxyOut;
        DataOutputStream replyOut;
        block19: {
            this.updateCurrentThreadName("Replacing block " + block + " from " + delHint);
            replyOut = new DataOutputStream(this.getOutputStream());
            this.checkAccess(replyOut, true, block, blockToken, Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE, new StorageType[]{storageType}, new String[]{storageId});
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                String msg = "Not able to receive block " + block.getBlockId() + " from " + this.peer.getRemoteAddressString() + " because threads quota=" + this.dataXceiverServer.balanceThrottler.getMaxConcurrentMovers() + " is exceeded.";
                LOG.warn(msg);
                this.sendResponse(DataTransferProtos.Status.ERROR, msg);
                return;
            }
            Socket proxySock = null;
            proxyOut = null;
            opStatus = DataTransferProtos.Status.SUCCESS;
            errMsg = null;
            proxyReply = null;
            boolean IoeDuringCopyBlockOperation = false;
            try {
                if (proxySource.equals((Object)this.datanode.getDatanodeId())) {
                    ReplicaInfo oldReplica = this.datanode.data.moveBlockAcrossStorage(block, storageType, storageId);
                    if (oldReplica != null) {
                        LOG.info("Moved {} from StorageType {} to {}", new Object[]{block, oldReplica.getVolume().getStorageType(), storageType});
                    }
                } else {
                    block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
                    String dnAddr = proxySource.getXferAddr(this.connectToDnViaHostname);
                    LOG.debug("Connecting to datanode {}", (Object)dnAddr);
                    InetSocketAddress proxyAddr = NetUtils.createSocketAddr((String)dnAddr);
                    proxySock = this.datanode.newSocket();
                    NetUtils.connect((Socket)proxySock, (SocketAddress)proxyAddr, (int)this.dnConf.socketTimeout);
                    proxySock.setTcpNoDelay(this.dnConf.getDataTransferServerTcpNoDelay());
                    proxySock.setSoTimeout(this.dnConf.socketTimeout);
                    proxySock.setKeepAlive(true);
                    OutputStream unbufProxyOut = NetUtils.getOutputStream((Socket)proxySock, (long)this.dnConf.socketWriteTimeout);
                    Object unbufProxyIn = NetUtils.getInputStream((Socket)proxySock);
                    DataEncryptionKeyFactory keyFactory = this.datanode.getDataEncryptionKeyFactoryForBlock(block);
                    IOStreamPair saslStreams = this.datanode.saslClient.socketSend(proxySock, unbufProxyOut, (InputStream)unbufProxyIn, keyFactory, blockToken, (DatanodeID)proxySource);
                    unbufProxyOut = saslStreams.out;
                    unbufProxyIn = saslStreams.in;
                    proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, this.smallBufferSize));
                    proxyReply = new DataInputStream(new BufferedInputStream((InputStream)unbufProxyIn, this.ioFileBufferSize));
                    IoeDuringCopyBlockOperation = true;
                    new Sender(proxyOut).copyBlock(block, blockToken);
                    IoeDuringCopyBlockOperation = false;
                    DataTransferProtos.BlockOpResponseProto copyResponse = DataTransferProtos.BlockOpResponseProto.parseFrom((InputStream)PBHelperClient.vintPrefixed((InputStream)proxyReply));
                    String logInfo = "copy block " + block + " from " + proxySock.getRemoteSocketAddress();
                    DataTransferProtoUtil.checkBlockOpStatus((DataTransferProtos.BlockOpResponseProto)copyResponse, (String)logInfo, (boolean)true);
                    DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
                    DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto((DataTransferProtos.ChecksumProto)checksumInfo.getChecksum());
                    this.setCurrentBlockReceiver(this.getBlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode, remoteChecksum, CachingStrategy.newDropBehind(), false, false, storageId));
                    this.blockReceiver.receiveBlock(null, null, replyOut, null, this.dataXceiverServer.balanceThrottler, null, true);
                    Replica r = this.blockReceiver.getReplica();
                    this.datanode.notifyNamenodeReceivedBlock(block, delHint, r.getStorageUuid(), r.isOnTransientStorage());
                    LOG.info("Moved {} from {}, delHint={}", new Object[]{block, this.peer.getRemoteAddressString(), delHint});
                }
                if (opStatus != DataTransferProtos.Status.SUCCESS || proxyReply == null) break block19;
            }
            catch (IOException ioe) {
                try {
                    opStatus = DataTransferProtos.Status.ERROR;
                    if (ioe instanceof BlockPinningException) {
                        opStatus = DataTransferProtos.Status.ERROR_BLOCK_PINNED;
                    }
                    errMsg = "opReplaceBlock " + block + " received exception " + ioe;
                    LOG.info(errMsg);
                    if (!IoeDuringCopyBlockOperation) {
                        this.incrDatanodeNetworkErrors();
                    }
                    throw ioe;
                }
                catch (Throwable throwable) {
                    if (opStatus == DataTransferProtos.Status.SUCCESS && proxyReply != null) {
                        try {
                            proxyReply.readChar();
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    this.dataXceiverServer.balanceThrottler.release();
                    try {
                        this.sendResponse(opStatus, errMsg);
                    }
                    catch (IOException ioe2) {
                        LOG.warn("Error writing reply back to {}", (Object)this.peer.getRemoteAddressString());
                        this.incrDatanodeNetworkErrors();
                    }
                    IOUtils.closeStream(proxyOut);
                    IOUtils.closeStream((Closeable)this.blockReceiver);
                    IOUtils.closeStream((Closeable)proxyReply);
                    IOUtils.closeStream((Closeable)replyOut);
                    throw throwable;
                }
            }
            try {
                proxyReply.readChar();
            }
            catch (IOException dnAddr) {
                // empty catch block
            }
        }
        this.dataXceiverServer.balanceThrottler.release();
        try {
            this.sendResponse(opStatus, errMsg);
        }
        catch (IOException ioe) {
            LOG.warn("Error writing reply back to {}", (Object)this.peer.getRemoteAddressString());
            this.incrDatanodeNetworkErrors();
        }
        IOUtils.closeStream(proxyOut);
        IOUtils.closeStream((Closeable)this.blockReceiver);
        IOUtils.closeStream((Closeable)proxyReply);
        IOUtils.closeStream((Closeable)replyOut);
        this.datanode.metrics.addReplaceBlockOp(this.elapsed());
    }

    @VisibleForTesting
    BlockReceiver getBlockReceiver(ExtendedBlock block, StorageType storageType, DataInputStream in, String inAddr, String myAddr, BlockConstructionStage stage, long newGs, long minBytesRcvd, long maxBytesRcvd, String clientname, DatanodeInfo srcDataNode, DataNode dn, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, boolean allowLazyPersist, boolean pinning, String storageId) throws IOException {
        return new BlockReceiver(block, storageType, in, inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, dn, requestedChecksum, cachingStrategy, allowLazyPersist, pinning, storageId);
    }

    @VisibleForTesting
    DataOutputStream getBufferedOutputStream() {
        return new DataOutputStream(new BufferedOutputStream(this.getOutputStream(), this.smallBufferSize));
    }

    private long elapsed() {
        return Time.monotonicNow() - this.opStartTime;
    }

    private void sendResponse(DataTransferProtos.Status status, String message) throws IOException {
        DataXceiver.writeResponse(status, message, this.getOutputStream());
    }

    private static void writeResponse(DataTransferProtos.Status status, String message, OutputStream out) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (message != null) {
            response.setMessage(message);
        }
        response.build().writeDelimitedTo(out);
        out.flush();
    }

    private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream out) throws IOException {
        DataTransferProtos.ReadOpChecksumInfoProto ckInfo = DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto((DataChecksum)blockSender.getChecksum())).setChunkOffset(blockSender.getOffset()).build();
        DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(ckInfo).build();
        response.writeDelimitedTo((OutputStream)out);
        out.flush();
    }

    private void incrDatanodeNetworkErrors() {
        this.datanode.incrDatanodeNetworkErrors(this.remoteAddressWithoutPort);
    }

    void checkAndWaitForBP(ExtendedBlock block) throws IOException {
        String bpId = block.getBlockPoolId();
        try {
            this.datanode.getDNRegistrationForBP(bpId);
            return;
        }
        catch (IOException iOException) {
            long bpReadyTimeout = this.dnConf.getBpReadyTimeout();
            StopWatch sw = new StopWatch();
            sw.start();
            while (sw.now(TimeUnit.SECONDS) <= bpReadyTimeout) {
                try {
                    this.datanode.getDNRegistrationForBP(bpId);
                    return;
                }
                catch (IOException iOException2) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException ie) {
                        throw new IOException("Interrupted while serving request. Aborting.");
                    }
                }
            }
            throw new IOException("Not ready to serve the block pool, " + bpId + ".");
        }
    }

    private void checkAccess(OutputStream out, boolean reply, ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op, BlockTokenIdentifier.AccessMode mode) throws IOException {
        this.checkAccess(out, reply, blk, t, op, mode, null, null);
    }

    private void checkAccess(OutputStream out, boolean reply, ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op, BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes, String[] storageIds) throws IOException {
        this.checkAndWaitForBP(blk);
        if (this.datanode.isBlockTokenEnabled) {
            LOG.debug("Checking block access token for block '{}' with mode '{}'", (Object)blk.getBlockId(), (Object)mode);
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode, storageTypes, storageIds);
            }
            catch (SecretManager.InvalidToken e) {
                try {
                    if (reply) {
                        DataTransferProtos.BlockOpResponseProto.Builder resp = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                        if (mode == BlockTokenIdentifier.AccessMode.WRITE) {
                            DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
                            resp.setFirstBadLink(dnR.getXferAddr());
                        }
                        resp.build().writeDelimitedTo(out);
                        out.flush();
                    }
                    LOG.warn("Block token verification failed: op={}, remoteAddress={}, message={}", new Object[]{op, this.remoteAddress, e.getLocalizedMessage()});
                    throw e;
                }
                catch (Throwable throwable) {
                    IOUtils.closeStream((Closeable)out);
                    throw throwable;
                }
            }
        }
    }
}

