package org.apache.hadoop.hdfs;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:lib/hadoop-hdfs-2.4.1-mapr-1408-SNAPSHOT-tests.jar:org/apache/hadoop/hdfs/TestReplication.class */
public class TestReplication {
    private static final long seed = 3735928559L;
    private static final int blockSize = 8192;
    private static final int fileSize = 16384;
    private static final String[] racks = {"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
    private static final int numDatanodes = racks.length;
    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.hdfs.TestReplication");

    private void writeFile(FileSystem fileSystem, Path path, int i) throws IOException {
        FSDataOutputStream create = fileSystem.create(path, true, fileSystem.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096), (short) i, 8192L);
        byte[] bArr = new byte[16384];
        new Random(seed).nextBytes(bArr);
        create.write(bArr);
        create.close();
    }

    private void checkFile(FileSystem fileSystem, Path path, int i) throws IOException {
        ClientProtocol clientProtocol = (ClientProtocol) NameNodeProxies.createProxy(fileSystem.getConf(), fileSystem.getUri(), ClientProtocol.class).getProxy();
        waitForBlockReplication(path.toString(), clientProtocol, Math.min(numDatanodes, i), -1L);
        LocatedBlocks blockLocations = clientProtocol.getBlockLocations(path.toString(), 0L, Long.MAX_VALUE);
        BlockLocation[] fileBlockLocations = fileSystem.getFileBlockLocations(fileSystem.getFileStatus(path), 0L, Long.MAX_VALUE);
        Assert.assertTrue(fileBlockLocations.length == blockLocations.locatedBlockCount());
        for (int i2 = 0; i2 < fileBlockLocations.length; i2++) {
            DatanodeInfo[] locations = blockLocations.get(i2).getLocations();
            String[] topologyPaths = fileBlockLocations[i2].getTopologyPaths();
            Assert.assertTrue(topologyPaths.length == locations.length);
            for (String str : topologyPaths) {
                boolean z = false;
                int i3 = 0;
                while (true) {
                    if (i3 >= racks.length) {
                        break;
                    }
                    if (str.startsWith(racks[i3])) {
                        z = true;
                        break;
                    }
                    i3++;
                }
                Assert.assertTrue(z);
            }
        }
        boolean z2 = true;
        boolean z3 = true;
        Iterator<LocatedBlock> it = blockLocations.getLocatedBlocks().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            DatanodeInfo[] locations2 = it.next().getLocations();
            if (locations2.length <= 1) {
                break;
            }
            if (locations2.length != 2) {
                z2 = false;
                z3 = false;
                for (int i4 = 0; i4 < locations2.length - 1; i4++) {
                    LOG.info("datanode " + i4 + ": " + locations2[i4]);
                    boolean z4 = false;
                    for (int i5 = i4 + 1; i5 < locations2.length; i5++) {
                        if (locations2[i4].getNetworkLocation().equals(locations2[i5].getNetworkLocation())) {
                            z4 = true;
                        }
                    }
                    if (z4) {
                        z2 = true;
                    }
                    if (!z4) {
                        z3 = true;
                    }
                    if (z2 && z3) {
                        break;
                    }
                }
                if (!z2 || !z3) {
                    break;
                }
            } else {
                z3 = !locations2[0].getNetworkLocation().equals(locations2[1].getNetworkLocation());
            }
        }
        Assert.assertTrue(z2);
        Assert.assertTrue(z3);
    }

    private void cleanupFile(FileSystem fileSystem, Path path) throws IOException {
        Assert.assertTrue(fileSystem.exists(path));
        fileSystem.delete(path, true);
        Assert.assertTrue(!fileSystem.exists(path));
    }

    @Test
    public void testBadBlockReportOnTransfer() throws Exception {
        LocatedBlocks locatedBlocks;
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(2).build();
        build.waitActive();
        DistributedFileSystem fileSystem = build.getFileSystem();
        DFSClient dFSClient = new DFSClient(new InetSocketAddress("localhost", build.getNameNodePort()), hdfsConfiguration);
        Path path = new Path("/tmp/testBadBlockReportOnTransfer/file1");
        DFSTestUtil.createFile(fileSystem, path, FileUtils.ONE_KB, (short) 1, 0L);
        DFSTestUtil.waitReplication((FileSystem) fileSystem, path, (short) 1);
        Assert.assertEquals("Corrupted too few blocks", 1, build.corruptBlockOnDataNodes(DFSTestUtil.getFirstBlock(fileSystem, path)));
        fileSystem.setReplication(path, (short) 2);
        LocatedBlocks blockLocations = dFSClient.getNamenode().getBlockLocations(path.toString(), 0L, Long.MAX_VALUE);
        while (true) {
            locatedBlocks = blockLocations;
            if (locatedBlocks.get(0).isCorrupt()) {
                break;
            }
            try {
                LOG.info("Waiting until block is marked as corrupt...");
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            blockLocations = dFSClient.getNamenode().getBlockLocations(path.toString(), 0L, Long.MAX_VALUE);
        }
        Assert.assertTrue(locatedBlocks.get(0).getLocations().length == 1);
        build.shutdown();
    }

    public void runReplication(boolean z) throws IOException {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        hdfsConfiguration.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
        if (z) {
            SimulatedFSDataset.setFactory(hdfsConfiguration);
        }
        MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(numDatanodes).racks(racks).build();
        build.waitActive();
        Assert.assertEquals("Number of Datanodes ", numDatanodes, new DFSClient(new InetSocketAddress("localhost", build.getNameNodePort()), hdfsConfiguration).datanodeReport(HdfsConstants.DatanodeReportType.LIVE).length);
        DistributedFileSystem fileSystem = build.getFileSystem();
        try {
            Path path = new Path("/smallblocktest.dat");
            writeFile(fileSystem, path, 3);
            checkFile(fileSystem, path, 3);
            cleanupFile(fileSystem, path);
            writeFile(fileSystem, path, 10);
            checkFile(fileSystem, path, 10);
            cleanupFile(fileSystem, path);
            writeFile(fileSystem, path, 4);
            checkFile(fileSystem, path, 4);
            cleanupFile(fileSystem, path);
            writeFile(fileSystem, path, 1);
            checkFile(fileSystem, path, 1);
            cleanupFile(fileSystem, path);
            writeFile(fileSystem, path, 2);
            checkFile(fileSystem, path, 2);
            cleanupFile(fileSystem, path);
            fileSystem.close();
            build.shutdown();
        } catch (Throwable th) {
            fileSystem.close();
            build.shutdown();
            throw th;
        }
    }

    @Test
    public void testReplicationSimulatedStorag() throws IOException {
        runReplication(true);
    }

    @Test
    public void testReplication() throws IOException {
        runReplication(false);
    }

    private void waitForBlockReplication(String str, ClientProtocol clientProtocol, int i, long j) throws IOException {
        long now = Time.now();
        LOG.info("Checking for block replication for " + str);
        while (true) {
            boolean z = true;
            Iterator<LocatedBlock> it = clientProtocol.getBlockLocations(str, 0L, Long.MAX_VALUE).getLocatedBlocks().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                LocatedBlock next = it.next();
                int length = next.getLocations().length;
                if (length < i) {
                    LOG.info("Not enough replicas for " + next.getBlock() + " yet. Expecting " + i + ", got " + length + ".");
                    z = false;
                    break;
                }
            }
            if (z) {
                return;
            }
            if (j > 0 && Time.now() - now > j * 1000) {
                throw new IOException("Timedout while waiting for all blocks to  be replicated for " + str);
            }
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
        }
    }

    @Test
    public void testPendingReplicationRetry() throws IOException {
        MiniDFSCluster miniDFSCluster = null;
        Path path = new Path("/replication-test-file");
        byte[] bArr = new byte[1024];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 49;
        }
        try {
            HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
            hdfsConfiguration.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(4));
            MiniDFSCluster build = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(4).build();
            build.waitActive();
            DFSClient dFSClient = new DFSClient(new InetSocketAddress("localhost", build.getNameNodePort()), hdfsConfiguration);
            FSDataOutputStream create = build.getFileSystem().create(path);
            create.write(bArr);
            create.close();
            waitForBlockReplication("/replication-test-file", dFSClient.getNamenode(), 4, -1L);
            ExtendedBlock block = dFSClient.getNamenode().getBlockLocations("/replication-test-file", 0L, Long.MAX_VALUE).get(0).getBlock();
            build.shutdown();
            miniDFSCluster = null;
            for (int i2 = 0; i2 < 25; i2++) {
                bArr[i2] = 48;
            }
            int i3 = 0;
            for (int i4 = 0; i4 < 3; i4++) {
                File blockFile = MiniDFSCluster.getBlockFile(i4, block);
                LOG.info("Checking for file " + blockFile);
                if (blockFile != null && blockFile.exists()) {
                    if (i3 == 0) {
                        LOG.info("Deleting file " + blockFile);
                        Assert.assertTrue(blockFile.delete());
                    } else {
                        LOG.info("Corrupting file " + blockFile);
                        long length = blockFile.length();
                        Assert.assertTrue(length > 50);
                        RandomAccessFile randomAccessFile = new RandomAccessFile(blockFile, "rw");
                        try {
                            randomAccessFile.seek(length / 3);
                            randomAccessFile.write(bArr, 0, 25);
                            randomAccessFile.close();
                        } catch (Throwable th) {
                            randomAccessFile.close();
                            throw th;
                        }
                    }
                    i3++;
                }
            }
            Assert.assertEquals(3L, i3);
            LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
            HdfsConfiguration hdfsConfiguration2 = new HdfsConfiguration();
            hdfsConfiguration2.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(4));
            hdfsConfiguration2.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
            hdfsConfiguration2.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
            hdfsConfiguration2.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.75f");
            MiniDFSCluster build2 = new MiniDFSCluster.Builder(hdfsConfiguration2).numDataNodes(4 * 2).format(false).build();
            build2.waitActive();
            waitForBlockReplication("/replication-test-file", new DFSClient(new InetSocketAddress("localhost", build2.getNameNodePort()), hdfsConfiguration2).getNamenode(), 4, -1L);
            if (build2 != null) {
                build2.shutdown();
            }
        } catch (Throwable th2) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th2;
        }
    }

    @Test
    public void testReplicateLenMismatchedBlock() throws Exception {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(2).build();
        try {
            build.waitActive();
            changeBlockLen(build, -1);
            changeBlockLen(build, 1);
            build.shutdown();
        } catch (Throwable th) {
            build.shutdown();
            throw th;
        }
    }

    private void changeBlockLen(MiniDFSCluster miniDFSCluster, int i) throws IOException, InterruptedException, TimeoutException {
        Path path = new Path("/file1");
        DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
        int i2 = fileSystem.getConf().getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
        DFSTestUtil.createFile(fileSystem, path, i2, (short) 1, 0L);
        DFSTestUtil.waitReplication((FileSystem) fileSystem, path, (short) 1);
        ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSystem, path);
        for (int i3 = 0; i3 < miniDFSCluster.getDataNodes().size() && !TestDatanodeBlockScanner.changeReplicaLength(firstBlock, i3, i); i3++) {
        }
        fileSystem.setReplication(path, (short) 2);
        DFSClient dFSClient = new DFSClient(new InetSocketAddress("localhost", miniDFSCluster.getNameNodePort()), fileSystem.getConf());
        LocatedBlocks blockLocations = dFSClient.getNamenode().getBlockLocations(path.toString(), 0L, i2);
        if (i < 0) {
            while (true) {
                if (blockLocations.get(0).isCorrupt() && 1 == blockLocations.get(0).getLocations().length) {
                    break;
                }
                Thread.sleep(100L);
                blockLocations = dFSClient.getNamenode().getBlockLocations(path.toString(), 0L, i2);
            }
        } else {
            while (2 != blockLocations.get(0).getLocations().length) {
                Thread.sleep(100L);
                blockLocations = dFSClient.getNamenode().getBlockLocations(path.toString(), 0L, i2);
            }
        }
        fileSystem.delete(path, true);
    }
}
