package org.apache.hadoop.hdfs;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:lib/hadoop-hdfs-2.7.0-mapr-1803-r1-tests.jar:org/apache/hadoop/hdfs/TestDataTransferProtocol.class */
public class TestDataTransferProtocol {
    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.hdfs.TestDataTransferProtocol");
    private static final DataChecksum DEFAULT_CHECKSUM = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
    DatanodeID datanode;
    InetSocketAddress dnAddr;
    final ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
    final DataOutputStream sendOut = new DataOutputStream(this.sendBuf);
    final Sender sender = new Sender(this.sendOut);
    final ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
    final DataOutputStream recvOut = new DataOutputStream(this.recvBuf);

    private void sendRecvData(String str, boolean z) throws IOException {
        Socket socket = null;
        if (str != null) {
            try {
                LOG.info("Testing : " + str);
            } catch (Throwable th) {
                IOUtils.closeSocket(socket);
                throw th;
            }
        }
        LOG.info("Going to write:" + StringUtils.byteToHexString(this.sendBuf.toByteArray()));
        socket = new Socket();
        socket.connect(this.dnAddr, 60000);
        socket.setSoTimeout(60000);
        OutputStream outputStream = socket.getOutputStream();
        byte[] bArr = new byte[this.recvBuf.size()];
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        outputStream.write(this.sendBuf.toByteArray());
        outputStream.flush();
        try {
            dataInputStream.readFully(bArr);
            String byteToHexString = StringUtils.byteToHexString(bArr);
            String byteToHexString2 = StringUtils.byteToHexString(this.recvBuf.toByteArray());
            LOG.info("Received: " + byteToHexString);
            LOG.info("Expected: " + byteToHexString2);
            if (z) {
                throw new IOException("Did not recieve IOException when an exception is expected while reading from " + this.datanode);
            }
            Assert.assertEquals(byteToHexString2, byteToHexString);
            IOUtils.closeSocket(socket);
        } catch (EOFException e) {
            if (!z) {
                throw e;
            }
            LOG.info("Got EOF as expected.");
            IOUtils.closeSocket(socket);
        }
    }

    void createFile(FileSystem fileSystem, Path path, int i) throws IOException {
        FSDataOutputStream create = fileSystem.create(path);
        create.write(new byte[i]);
        create.close();
    }

    void readFile(FileSystem fileSystem, Path path, int i) throws IOException {
        fileSystem.open(path).readFully(new byte[i]);
    }

    private void writeZeroLengthPacket(ExtendedBlock extendedBlock, String str) throws IOException {
        new PacketHeader(8, extendedBlock.getNumBytes(), 100L, true, 0, false).write(this.sendOut);
        this.sendOut.writeInt(0);
        sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
        new PipelineAck(100L, new int[]{PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, DataTransferProtos.Status.SUCCESS)}).write(this.recvOut);
        sendRecvData(str, false);
    }

    private void sendResponse(DataTransferProtos.Status status, String str, String str2, DataOutputStream dataOutputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder status2 = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (str != null) {
            status2.setFirstBadLink(str);
        }
        if (str2 != null) {
            status2.setMessage(str2);
        }
        status2.build().writeDelimitedTo(dataOutputStream);
    }

    private void testWrite(ExtendedBlock extendedBlock, BlockConstructionStage blockConstructionStage, long j, String str, Boolean bool) throws IOException {
        this.sendBuf.reset();
        this.recvBuf.reset();
        writeBlock(extendedBlock, blockConstructionStage, j, DEFAULT_CHECKSUM);
        if (bool.booleanValue()) {
            sendResponse(DataTransferProtos.Status.ERROR, null, null, this.recvOut);
            sendRecvData(str, true);
        } else if (blockConstructionStage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            writeZeroLengthPacket(extendedBlock, str);
        } else {
            sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            sendRecvData(str, false);
        }
    }

    @Test
    public void testOpWrite() throws IOException {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(1).build();
        try {
            build.waitActive();
            this.datanode = DataNodeTestUtils.getDNRegistrationForBP(build.getDataNodes().get(0), build.getNamesystem().getBlockPoolId());
            this.dnAddr = NetUtils.createSocketAddr(this.datanode.getXferAddr());
            DistributedFileSystem fileSystem = build.getFileSystem();
            Path path = new Path("dataprotocol.dat");
            DFSTestUtil.createFile(fileSystem, path, 1L, (short) 1, 0L);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
            testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create an existing block", true);
            testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L, "Unexpected stage", true);
            testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, firstBlock.getGenerationStamp() + 1, "Cannot recover data streaming to a finalized replica", true);
            long generationStamp = firstBlock.getGenerationStamp() + 1;
            testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND, generationStamp, "Append to a finalized replica", false);
            firstBlock.setGenerationStamp(generationStamp);
            Path path2 = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile(fileSystem, path2, 1L, (short) 1, 0L);
            ExtendedBlock firstBlock2 = DFSTestUtil.getFirstBlock(fileSystem, path2);
            testWrite(firstBlock2, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, firstBlock2.getGenerationStamp() + 1, "Recover appending to a finalized replica", false);
            Path path3 = new Path("dataprotocol2.dat");
            DFSTestUtil.createFile(fileSystem, path3, 1L, (short) 1, 0L);
            ExtendedBlock firstBlock3 = DFSTestUtil.getFirstBlock(fileSystem, path3);
            long generationStamp2 = firstBlock3.getGenerationStamp() + 1;
            testWrite(firstBlock3, BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, generationStamp2, "Recover failed close to a finalized replica", false);
            firstBlock3.setGenerationStamp(generationStamp2);
            ExtendedBlock extendedBlock = new ExtendedBlock(firstBlock3.getBlockPoolId(), firstBlock3.getBlockId() + 128, 0L, firstBlock3.getGenerationStamp());
            testWrite(extendedBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Create a new block", false);
            long generationStamp3 = extendedBlock.getGenerationStamp() + 1;
            extendedBlock.setBlockId(extendedBlock.getBlockId() + 1);
            testWrite(extendedBlock, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, generationStamp3, "Recover a new block", true);
            testWrite(extendedBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND, extendedBlock.getGenerationStamp() + 1, "Cannot append to a new block", true);
            extendedBlock.setBlockId(extendedBlock.getBlockId() + 1);
            testWrite(extendedBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, extendedBlock.getGenerationStamp() + 1, "Cannot append to a new block", true);
            Path path4 = new Path("dataprotocol1.dat");
            DFSTestUtil.createFile(fileSystem, path4, 1L, (short) 1, 0L);
            DFSOutputStream dFSOutputStream = (DFSOutputStream) fileSystem.append(path4).getWrappedStream();
            dFSOutputStream.write(1);
            dFSOutputStream.hflush();
            FSDataInputStream open = fileSystem.open(path4);
            ExtendedBlock block = DFSTestUtil.getAllBlocks(open).get(0).getBlock();
            block.setNumBytes(2L);
            try {
                testWrite(block, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, "Cannot create a RBW block", true);
                long generationStamp4 = block.getGenerationStamp() + 1;
                testWrite(block, BlockConstructionStage.PIPELINE_SETUP_APPEND, generationStamp4, "Cannot append to a RBW replica", true);
                testWrite(block, BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, generationStamp4, "Recover append to a RBW replica", false);
                block.setGenerationStamp(generationStamp4);
                Path path5 = new Path("dataprotocol2.dat");
                DFSTestUtil.createFile(fileSystem, path5, 1L, (short) 1, 0L);
                dFSOutputStream = (DFSOutputStream) fileSystem.append(path5).getWrappedStream();
                dFSOutputStream.write(1);
                dFSOutputStream.hflush();
                open = fileSystem.open(path5);
                ExtendedBlock block2 = DFSTestUtil.getAllBlocks(open).get(0).getBlock();
                block2.setNumBytes(2L);
                testWrite(block2, BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, block2.getGenerationStamp() + 1, "Recover a RBW replica", false);
                IOUtils.closeStream(open);
                IOUtils.closeStream(dFSOutputStream);
            } catch (Throwable th) {
                IOUtils.closeStream(open);
                IOUtils.closeStream(dFSOutputStream);
                throw th;
            }
        } finally {
            build.shutdown();
        }
    }

    @Test
    public void testDataTransferProtocol() throws IOException {
        Random random = new Random();
        Path path = new Path("dataprotocol.dat");
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
        MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
        try {
            build.waitActive();
            this.datanode = build.getFileSystem().getDataNodeStats(HdfsConstants.DatanodeReportType.LIVE)[0];
            this.dnAddr = NetUtils.createSocketAddr(this.datanode.getXferAddr());
            DistributedFileSystem fileSystem = build.getFileSystem();
            int min = Math.min(hdfsConfiguration.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
            createFile(fileSystem, path, min);
            ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
            String blockPoolId = firstBlock.getBlockPoolId();
            long blockId = firstBlock.getBlockId() + 1;
            this.recvBuf.reset();
            this.sendBuf.reset();
            this.recvOut.writeShort(27);
            this.sendOut.writeShort(27);
            sendRecvData("Wrong Version", true);
            this.sendBuf.reset();
            this.sendOut.writeShort(28);
            this.sendOut.writeByte(Op.WRITE_BLOCK.code - 1);
            sendRecvData("Wrong Op Code", true);
            this.sendBuf.reset();
            DataChecksum dataChecksum = (DataChecksum) Mockito.spy(DEFAULT_CHECKSUM);
            ((DataChecksum) Mockito.doReturn(-1).when(dataChecksum)).getBytesPerChecksum();
            writeBlock(blockPoolId, blockId, dataChecksum);
            this.recvBuf.reset();
            sendResponse(DataTransferProtos.Status.ERROR, null, null, this.recvOut);
            sendRecvData("wrong bytesPerChecksum while writing", true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            long j = blockId + 1;
            writeBlock(blockPoolId, j, DEFAULT_CHECKSUM);
            new PacketHeader(4, 0L, 100L, false, (-1) - random.nextInt(1048576), false).write(this.sendOut);
            sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            new PipelineAck(100L, new int[]{PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, DataTransferProtos.Status.ERROR)}).write(this.recvOut);
            sendRecvData("negative DATA_CHUNK len while writing block " + j, true);
            this.sendBuf.reset();
            this.recvBuf.reset();
            long j2 = j + 1;
            writeBlock(blockPoolId, j2, DEFAULT_CHECKSUM);
            new PacketHeader(8, 0L, 100L, true, 0, false).write(this.sendOut);
            this.sendOut.writeInt(0);
            this.sendOut.flush();
            sendResponse(DataTransferProtos.Status.SUCCESS, "", null, this.recvOut);
            new PipelineAck(100L, new int[]{PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, DataTransferProtos.Status.SUCCESS)}).write(this.recvOut);
            sendRecvData("Writing a zero len block blockid " + j2, false);
            ExtendedBlock extendedBlock = new ExtendedBlock(build.getNamesystem().getBlockPoolId(), firstBlock.getLocalBlock());
            long blockId2 = extendedBlock.getBlockId();
            this.sendBuf.reset();
            this.recvBuf.reset();
            extendedBlock.setBlockId(blockId2 - 1);
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, min, true, CachingStrategy.newDefaultStrategy());
            sendRecvData("Wrong block ID " + j2 + " for read", false);
            this.sendBuf.reset();
            extendedBlock.setBlockId(blockId2);
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", -1L, min, true, CachingStrategy.newDefaultStrategy());
            sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", min, min, true, CachingStrategy.newDefaultStrategy());
            sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM)).setChunkOffset(0L)).build().writeDelimitedTo(this.recvOut);
            this.sendBuf.reset();
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, (-1) - random.nextInt(1048576), true, CachingStrategy.newDefaultStrategy());
            sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false);
            this.recvBuf.reset();
            sendResponse(DataTransferProtos.Status.ERROR, null, "opReadBlock " + firstBlock + " received exception java.io.IOException:  Offset 0 and length 4097 don't match block " + firstBlock + " ( blockLen 4096 )", this.recvOut);
            this.sendBuf.reset();
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, min + 1, true, CachingStrategy.newDefaultStrategy());
            sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false);
            this.sendBuf.reset();
            this.sender.readBlock(extendedBlock, BlockTokenSecretManager.DUMMY_TOKEN, "cl", 0L, min, true, CachingStrategy.newDefaultStrategy());
            readFile(fileSystem, path, min);
            build.shutdown();
        } catch (Throwable th) {
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testPacketHeader() throws IOException {
        PacketHeader packetHeader = new PacketHeader(4, FileUtils.ONE_KB, 100L, false, 4096, false);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        packetHeader.write(new DataOutputStream(byteArrayOutputStream));
        PacketHeader packetHeader2 = new PacketHeader();
        packetHeader2.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        Assert.assertEquals(packetHeader, packetHeader2);
        PacketHeader packetHeader3 = new PacketHeader();
        packetHeader3.readFields(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
        Assert.assertEquals(packetHeader, packetHeader3);
        Assert.assertTrue(packetHeader.sanityCheck(99L));
        Assert.assertFalse(packetHeader.sanityCheck(100L));
    }

    @Test
    public void TestPipeLineAckCompatibility() throws IOException {
        DataTransferProtos.PipelineAckProto build = DataTransferProtos.PipelineAckProto.newBuilder().setSeqno(0L).addReply(DataTransferProtos.Status.CHECKSUM_OK).build();
        DataTransferProtos.PipelineAckProto build2 = DataTransferProtos.PipelineAckProto.newBuilder().mergeFrom(build).addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, DataTransferProtos.Status.CHECKSUM_OK)).build();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        build.writeDelimitedTo(byteArrayOutputStream);
        new PipelineAck().readFields(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
        Assert.assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, DataTransferProtos.Status.CHECKSUM_OK), r0.getHeaderFlag(0));
        PipelineAck pipelineAck = new PipelineAck();
        ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
        build2.writeDelimitedTo(byteArrayOutputStream2);
        pipelineAck.readFields(new ByteArrayInputStream(byteArrayOutputStream2.toByteArray()));
        Assert.assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, DataTransferProtos.Status.CHECKSUM_OK), pipelineAck.getHeaderFlag(0));
    }

    void writeBlock(String str, long j, DataChecksum dataChecksum) throws IOException {
        writeBlock(new ExtendedBlock(str, j), BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, dataChecksum);
    }

    void writeBlock(ExtendedBlock extendedBlock, BlockConstructionStage blockConstructionStage, long j, DataChecksum dataChecksum) throws IOException {
        this.sender.writeBlock(extendedBlock, StorageType.DEFAULT, BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, blockConstructionStage, 0, extendedBlock.getNumBytes(), extendedBlock.getNumBytes(), j, dataChecksum, CachingStrategy.newDefaultStrategy(), false, false, null);
    }
}
