/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniHDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;

public class TestReplication {
    private static final long seed = 3735928559L;
    private static final int blockSize = 8192;
    private static final int fileSize = 16384;
    private static final String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
    private static final int numDatanodes = racks.length;
    private static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.hdfs.TestReplication");

    private void writeFile(FileSystem fileSys, Path name, int repl) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 8192L);
        byte[] buffer = new byte[16384];
        Random rand = new Random(3735928559L);
        rand.nextBytes(buffer);
        stm.write(buffer);
        stm.close();
    }

    private void checkFile(FileSystem fileSys, Path name, int repl) throws IOException {
        LocatedBlock blk;
        DatanodeInfo[] datanodes;
        Configuration conf = fileSys.getConf();
        ClientProtocol namenode = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)fileSys.getUri(), ClientProtocol.class).getProxy();
        this.waitForBlockReplication(name.toString(), namenode, Math.min(numDatanodes, repl), -1L);
        LocatedBlocks locations = namenode.getBlockLocations(name.toString(), 0L, Long.MAX_VALUE);
        FileStatus stat = fileSys.getFileStatus(name);
        BlockLocation[] blockLocations = fileSys.getFileBlockLocations(stat, 0L, Long.MAX_VALUE);
        Assert.assertTrue((blockLocations.length == locations.locatedBlockCount() ? 1 : 0) != 0);
        for (int i = 0; i < blockLocations.length; ++i) {
            LocatedBlock blk2 = locations.get(i);
            DatanodeInfo[] datanodes2 = blk2.getLocations();
            String[] topologyPaths = blockLocations[i].getTopologyPaths();
            Assert.assertTrue((topologyPaths.length == datanodes2.length ? 1 : 0) != 0);
            for (int j = 0; j < topologyPaths.length; ++j) {
                boolean found = false;
                for (int k = 0; k < racks.length; ++k) {
                    if (!topologyPaths[j].startsWith(racks[k])) continue;
                    found = true;
                    break;
                }
                Assert.assertTrue((boolean)found);
            }
        }
        boolean isOnSameRack = true;
        boolean isNotOnSameRack = true;
        Iterator i$ = locations.getLocatedBlocks().iterator();
        while (i$.hasNext() && (datanodes = (blk = (LocatedBlock)i$.next()).getLocations()).length > 1) {
            if (datanodes.length == 2) {
                isNotOnSameRack = !datanodes[0].getNetworkLocation().equals(datanodes[1].getNetworkLocation());
                break;
            }
            isOnSameRack = false;
            isNotOnSameRack = false;
            for (int i = 0; i < datanodes.length - 1; ++i) {
                LOG.info((Object)("datanode " + i + ": " + datanodes[i]));
                boolean onRack = false;
                for (int j = i + 1; j < datanodes.length; ++j) {
                    if (!datanodes[i].getNetworkLocation().equals(datanodes[j].getNetworkLocation())) continue;
                    onRack = true;
                }
                if (onRack) {
                    isOnSameRack = true;
                }
                if (!onRack) {
                    isNotOnSameRack = true;
                }
                if (isOnSameRack && isNotOnSameRack) break;
            }
            if (isOnSameRack && isNotOnSameRack) continue;
            break;
        }
        Assert.assertTrue((boolean)isOnSameRack);
        Assert.assertTrue((boolean)isNotOnSameRack);
    }

    private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
        Assert.assertTrue((boolean)fileSys.exists(name));
        fileSys.delete(name, true);
        Assert.assertTrue((!fileSys.exists(name) ? 1 : 0) != 0);
    }

    @Test
    public void testBadBlockReportOnTransfer() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        DistributedFileSystem fs = null;
        DFSClient dfsClient = null;
        LocatedBlocks blocks = null;
        int replicaCount = 0;
        short replFactor = 1;
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).buildHDFS();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), (Configuration)conf);
        Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
        DFSTestUtil.createFile((FileSystem)fs, file1, 1024L, replFactor, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, file1, replFactor);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, file1);
        int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
        Assert.assertEquals((String)"Corrupted too few blocks", (long)replFactor, (long)blockFilesCorrupted);
        replFactor = 2;
        fs.setReplication(file1, replFactor);
        blocks = dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE);
        while (!blocks.get(0).isCorrupt()) {
            try {
                LOG.info((Object)"Waiting until block is marked as corrupt...");
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
            blocks = dfsClient.getNamenode().getBlockLocations(file1.toString(), 0L, Long.MAX_VALUE);
        }
        replicaCount = blocks.get(0).getLocations().length;
        Assert.assertTrue((replicaCount == 1 ? 1 : 0) != 0);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runReplication(boolean simulated) throws IOException {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.replication.considerLoad", false);
        if (simulated) {
            SimulatedFSDataset.setFactory((Configuration)conf);
        }
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDatanodes).racks(racks).buildHDFS();
        cluster.waitActive();
        InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort());
        DFSClient client = new DFSClient(addr, (Configuration)conf);
        DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        Assert.assertEquals((String)"Number of Datanodes ", (long)numDatanodes, (long)info.length);
        DistributedFileSystem fileSys = cluster.getFileSystem();
        try {
            Path file1 = new Path("/smallblocktest.dat");
            this.writeFile((FileSystem)fileSys, file1, 3);
            this.checkFile((FileSystem)fileSys, file1, 3);
            this.cleanupFile((FileSystem)fileSys, file1);
            this.writeFile((FileSystem)fileSys, file1, 10);
            this.checkFile((FileSystem)fileSys, file1, 10);
            this.cleanupFile((FileSystem)fileSys, file1);
            this.writeFile((FileSystem)fileSys, file1, 4);
            this.checkFile((FileSystem)fileSys, file1, 4);
            this.cleanupFile((FileSystem)fileSys, file1);
            this.writeFile((FileSystem)fileSys, file1, 1);
            this.checkFile((FileSystem)fileSys, file1, 1);
            this.cleanupFile((FileSystem)fileSys, file1);
            this.writeFile((FileSystem)fileSys, file1, 2);
            this.checkFile((FileSystem)fileSys, file1, 2);
            this.cleanupFile((FileSystem)fileSys, file1);
        }
        finally {
            fileSys.close();
            cluster.shutdown();
        }
    }

    @Test
    public void testReplicationSimulatedStorag() throws IOException {
        this.runReplication(true);
    }

    @Test
    public void testReplication() throws IOException {
        this.runReplication(false);
    }

    private void waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec) throws IOException {
        long start = Time.now();
        LOG.info((Object)("Checking for block replication for " + filename));
        while (true) {
            boolean replOk = true;
            LocatedBlocks blocks = namenode.getBlockLocations(filename, 0L, Long.MAX_VALUE);
            for (LocatedBlock block : blocks.getLocatedBlocks()) {
                int actual = block.getLocations().length;
                if (actual >= expected) continue;
                LOG.info((Object)("Not enough replicas for " + block.getBlock() + " yet. Expecting " + expected + ", got " + actual + "."));
                replOk = false;
                break;
            }
            if (replOk) {
                return;
            }
            if (maxWaitSec > 0L && Time.now() - start > maxWaitSec * 1000L) {
                throw new IOException("Timedout while waiting for all blocks to  be replicated for " + filename);
            }
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException ignored) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPendingReplicationRetry() throws IOException {
        MiniDFSCluster cluster = null;
        int numDataNodes = 4;
        String testFile = "/replication-test-file";
        Path testPath = new Path(testFile);
        byte[] buffer = new byte[1024];
        for (int i = 0; i < buffer.length; ++i) {
            buffer[i] = 49;
        }
        try {
            HdfsConfiguration conf = new HdfsConfiguration();
            conf.set("dfs.replication", Integer.toString(numDataNodes));
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes).buildHDFS();
            ((MiniHDFSCluster)cluster).waitActive();
            DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", ((MiniHDFSCluster)cluster).getNameNodePort()), (Configuration)conf);
            FSDataOutputStream out = ((MiniHDFSCluster)cluster).getFileSystem().create(testPath);
            out.write(buffer);
            out.close();
            this.waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1L);
            ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(testFile, 0L, Long.MAX_VALUE).get(0).getBlock();
            cluster.shutdown();
            cluster = null;
            for (int i = 0; i < 25; ++i) {
                buffer[i] = 48;
            }
            int fileCount = 0;
            for (int dnIndex = 0; dnIndex < 3; ++dnIndex) {
                File blockFile = MiniHDFSCluster.getBlockFile(dnIndex, block);
                LOG.info((Object)("Checking for file " + blockFile));
                if (blockFile == null || !blockFile.exists()) continue;
                if (fileCount == 0) {
                    LOG.info((Object)("Deleting file " + blockFile));
                    Assert.assertTrue((boolean)blockFile.delete());
                } else {
                    LOG.info((Object)("Corrupting file " + blockFile));
                    long len = blockFile.length();
                    Assert.assertTrue((len > 50L ? 1 : 0) != 0);
                    RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
                    try {
                        blockOut.seek(len / 3L);
                        blockOut.write(buffer, 0, 25);
                    }
                    finally {
                        blockOut.close();
                    }
                }
                ++fileCount;
            }
            Assert.assertEquals((long)3L, (long)fileCount);
            LOG.info((Object)"Restarting minicluster after deleting a replica and corrupting 2 crcs");
            conf = new HdfsConfiguration();
            conf.set("dfs.replication", Integer.toString(numDataNodes));
            conf.set("dfs.namenode.replication.pending.timeout-sec", Integer.toString(2));
            conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
            conf.set("dfs.namenode.safemode.threshold-pct", "0.75f");
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(numDataNodes * 2).format(false).buildHDFS();
            ((MiniHDFSCluster)cluster).waitActive();
            dfsClient = new DFSClient(new InetSocketAddress("localhost", ((MiniHDFSCluster)cluster).getNameNodePort()), (Configuration)conf);
            this.waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1L);
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicateLenMismatchedBlock() throws Exception {
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)new HdfsConfiguration()).numDataNodes(2).buildHDFS();
        try {
            cluster.waitActive();
            this.changeBlockLen(cluster, -1);
            this.changeBlockLen(cluster, 1);
        }
        finally {
            cluster.shutdown();
        }
    }

    private void changeBlockLen(MiniHDFSCluster cluster, int lenDelta) throws IOException, InterruptedException, TimeoutException {
        Path fileName = new Path("/file1");
        boolean REPLICATION_FACTOR = true;
        DistributedFileSystem fs = cluster.getFileSystem();
        int fileLen = fs.getConf().getInt("dfs.bytes-per-checksum", 512);
        DFSTestUtil.createFile((FileSystem)fs, fileName, fileLen, (short)1, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, fileName, (short)1);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, fileName);
        for (int i = 0; i < cluster.getDataNodes().size() && !TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta); ++i) {
        }
        fs.setReplication(fileName, (short)2);
        DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), fs.getConf());
        LocatedBlocks blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
        if (lenDelta < 0) {
            while (!blocks.get(0).isCorrupt() || 1 != blocks.get(0).getLocations().length) {
                Thread.sleep(100L);
                blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
            }
        } else {
            while (2 != blocks.get(0).getLocations().length) {
                Thread.sleep(100L);
                blocks = dfsClient.getNamenode().getBlockLocations(fileName.toString(), 0L, (long)fileLen);
            }
        }
        fs.delete(fileName, true);
    }
}

