/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class TestDataNodeHotSwapVolumes {
    private static final Log LOG = LogFactory.getLog(TestDataNodeHotSwapVolumes.class);
    private static final int BLOCK_SIZE = 512;
    private MiniDFSCluster cluster;

    @After
    public void tearDown() {
        this.shutdown();
    }

    private void startDFSCluster(int numNameNodes, int numDataNodes) throws IOException {
        this.shutdown();
        Configuration conf = new Configuration();
        conf.setLong("dfs.blocksize", 512L);
        conf.setInt("dfs.heartbeat.interval", 1);
        conf.setInt("dfs.df.interval", 1000);
        conf.setInt("dfs.namenode.heartbeat.recheck-interval", 1000);
        MiniDFSNNTopology nnTopology = MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
        this.cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology).numDataNodes(numDataNodes).build();
        this.cluster.waitActive();
    }

    private void shutdown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void createFile(Path path, int numBlocks) throws IOException, InterruptedException, TimeoutException {
        boolean replicateFactor = true;
        this.createFile(path, numBlocks, (short)1);
    }

    private void createFile(Path path, int numBlocks, short replicateFactor) throws IOException, InterruptedException, TimeoutException {
        this.createFile(0, path, numBlocks, replicateFactor);
    }

    private void createFile(int fsIdx, Path path, int numBlocks) throws IOException, InterruptedException, TimeoutException {
        boolean replicateFactor = true;
        this.createFile(fsIdx, path, numBlocks, (short)1);
    }

    private void createFile(int fsIdx, Path path, int numBlocks, short replicateFactor) throws IOException, TimeoutException, InterruptedException {
        boolean seed = false;
        DistributedFileSystem fs = this.cluster.getFileSystem(fsIdx);
        DFSTestUtil.createFile((FileSystem)fs, path, 512 * numBlocks, replicateFactor, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, path, replicateFactor);
    }

    private static void verifyFileLength(FileSystem fs, Path path, int numBlocks) throws IOException {
        FileStatus status = fs.getFileStatus(path);
        Assert.assertEquals((long)(numBlocks * 512), (long)status.getLen());
    }

    private static int getNumReplicas(FileSystem fs, Path file, int blockIdx) throws IOException {
        BlockLocation[] locs = fs.getFileBlockLocations(file, 0L, Long.MAX_VALUE);
        return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length;
    }

    private static void waitReplication(FileSystem fs, Path file, int blockIdx, int numReplicas) throws IOException, TimeoutException, InterruptedException {
        for (int attempts = 50; attempts > 0; --attempts) {
            if (TestDataNodeHotSwapVolumes.getNumReplicas(fs, file, blockIdx) == numReplicas) {
                return;
            }
            Thread.sleep(100L);
        }
        throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block" + " of " + file + " to have " + numReplicas + " replicas.");
    }

    private static Collection<String> getDataDirs(DataNode datanode) {
        return datanode.getConf().getTrimmedStringCollection("dfs.datanode.data.dir");
    }

    @Test
    public void testParseChangedVolumes() throws IOException {
        this.startDFSCluster(1, 1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        String oldPaths = conf.get("dfs.datanode.data.dir");
        ArrayList<StorageLocation> oldLocations = new ArrayList<StorageLocation>();
        for (String path : oldPaths.split(",")) {
            oldLocations.add(StorageLocation.parse((String)path));
        }
        Assert.assertFalse((boolean)oldLocations.isEmpty());
        String newPaths = "/foo/path1,/foo/path2";
        conf.set("dfs.datanode.data.dir", newPaths);
        DataNode.ChangedVolumes changedVolumes = dn.parseChangedVolumes();
        List newVolumes = changedVolumes.newLocations;
        Assert.assertEquals((long)2L, (long)newVolumes.size());
        Assert.assertEquals((Object)new File("/foo/path1").getAbsolutePath(), (Object)((StorageLocation)newVolumes.get(0)).getFile().getAbsolutePath());
        Assert.assertEquals((Object)new File("/foo/path2").getAbsolutePath(), (Object)((StorageLocation)newVolumes.get(1)).getFile().getAbsolutePath());
        List removedVolumes = changedVolumes.deactivateLocations;
        Assert.assertEquals((long)oldLocations.size(), (long)removedVolumes.size());
        for (int i = 0; i < removedVolumes.size(); ++i) {
            Assert.assertEquals((Object)((StorageLocation)oldLocations.get(i)).getFile(), (Object)((StorageLocation)removedVolumes.get(i)).getFile());
        }
    }

    @Test
    public void testParseChangedVolumesFailures() throws IOException {
        this.startDFSCluster(1, 1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        try {
            conf.set("dfs.datanode.data.dir", "");
            dn.parseChangedVolumes();
            Assert.fail((String)"Should throw IOException: empty inputs.");
        }
        catch (IOException e) {
            GenericTestUtils.assertExceptionContains((String)"No directory is specified.", (Throwable)e);
        }
    }

    private void addVolumes(int numNewVolumes) throws ReconfigurationException {
        File volumeDir;
        File dataDir = new File(this.cluster.getDataDirectory());
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        String oldDataDir = conf.get("dfs.datanode.data.dir");
        ArrayList<File> newVolumeDirs = new ArrayList<File>();
        StringBuilder newDataDirBuf = new StringBuilder(oldDataDir);
        int startIdx = oldDataDir.split(",").length + 1;
        while ((volumeDir = new File(dataDir, "data" + startIdx)).exists()) {
            ++startIdx;
        }
        for (int i = startIdx; i < startIdx + numNewVolumes; ++i) {
            File volumeDir2 = new File(dataDir, "data" + String.valueOf(i));
            newVolumeDirs.add(volumeDir2);
            volumeDir2.mkdirs();
            newDataDirBuf.append(",");
            newDataDirBuf.append(volumeDir2.toURI());
        }
        String newDataDir = newDataDirBuf.toString();
        dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDataDir);
        Assert.assertEquals((Object)newDataDir, (Object)conf.get("dfs.datanode.data.dir"));
        for (File volumeDir3 : newVolumeDirs) {
            File curDir = new File(volumeDir3, "current");
            Assert.assertTrue((boolean)curDir.exists());
            Assert.assertTrue((boolean)curDir.isDirectory());
        }
    }

    private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
        ArrayList<List<Integer>> results = new ArrayList<List<Integer>>();
        String bpid = this.cluster.getNamesystem(namesystemIdx).getBlockPoolId();
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) {
            ArrayList<Integer> numBlocksPerDN = new ArrayList<Integer>();
            for (BlockListAsLongs blocks : datanodeReport.values()) {
                numBlocksPerDN.add(blocks.getNumberOfBlocks());
            }
            results.add(numBlocksPerDN);
        }
        return results;
    }

    @Test
    public void testAddOneNewVolume() throws IOException, ReconfigurationException, InterruptedException, TimeoutException {
        this.startDFSCluster(1, 1);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        int numBlocks = 10;
        this.addVolumes(1);
        Path testFile = new Path("/test");
        this.createFile(testFile, 10);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)3L, (long)blockReports.get(0).size());
        int minNumBlocks = Integer.MAX_VALUE;
        int maxNumBlocks = Integer.MIN_VALUE;
        for (BlockListAsLongs blockList : blockReports.get(0).values()) {
            minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
            maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
        }
        Assert.assertTrue((Math.abs(maxNumBlocks - maxNumBlocks) <= 1 ? 1 : 0) != 0);
        TestDataNodeHotSwapVolumes.verifyFileLength((FileSystem)this.cluster.getFileSystem(), testFile, 10);
    }

    @Test(timeout=60000L)
    public void testAddVolumesDuringWrite() throws IOException, InterruptedException, TimeoutException, ReconfigurationException {
        this.startDFSCluster(1, 1);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        Path testFile = new Path("/test");
        this.createFile(testFile, 4);
        this.addVolumes(2);
        DFSTestUtil.appendFile((FileSystem)this.cluster.getFileSystem(), testFile, 4096);
        TestDataNodeHotSwapVolumes.verifyFileLength((FileSystem)this.cluster.getFileSystem(), testFile, 12);
        List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)4L, (long)blockReports.get(0).size());
        Map<DatanodeStorage, BlockListAsLongs> dnReport = blockReports.get(0);
        ArrayList<Integer> actualNumBlocks = new ArrayList<Integer>();
        for (BlockListAsLongs blockList : dnReport.values()) {
            actualNumBlocks.add(blockList.getNumberOfBlocks());
        }
        Collections.sort(actualNumBlocks);
        Assert.assertEquals(expectedNumBlocks, actualNumBlocks);
    }

    @Test
    public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, ReconfigurationException {
        int numNameNodes = 2;
        boolean numDataNodes = true;
        this.startDFSCluster(2, 1);
        Path testFile = new Path("/test");
        this.createFile(0, testFile, 4);
        this.createFile(1, testFile, 4);
        int numNewVolumes = 2;
        this.addVolumes(2);
        DFSTestUtil.appendFile((FileSystem)this.cluster.getFileSystem(0), testFile, 4096);
        List<List<Integer>> actualNumBlocks = this.getNumBlocksReport(0);
        Assert.assertEquals((long)this.cluster.getDataNodes().size(), (long)actualNumBlocks.size());
        List<Integer> blocksOnFirstDN = actualNumBlocks.get(0);
        Collections.sort(blocksOnFirstDN);
        Assert.assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN);
        actualNumBlocks = this.getNumBlocksReport(1);
        Assert.assertEquals((long)4L, (long)actualNumBlocks.get(0).size());
        Assert.assertEquals((long)2L, (long)Collections.frequency((Collection)actualNumBlocks.get(0), 0));
    }

    @Test
    public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException {
        this.startDFSCluster(1, 1);
        boolean replFactor = true;
        Path testFile = new Path("/test");
        this.createFile(testFile, 10, (short)1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Collection<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
        String newDirs = oldDirs.iterator().next();
        dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs);
        TestDataNodeHotSwapVolumes.assertFileLocksReleased(new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
        dn.scheduleAllBlockReport(0L);
        try {
            DFSTestUtil.readFile((FileSystem)this.cluster.getFileSystem(), testFile);
            Assert.fail((String)"Expect to throw BlockMissingException.");
        }
        catch (BlockMissingException e) {
            GenericTestUtils.assertExceptionContains((String)"Could not obtain block", (Throwable)e);
        }
        Path newFile = new Path("/newFile");
        this.createFile(newFile, 6);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        BlockListAsLongs blocksForVolume1 = blockReports.get(0).values().iterator().next();
        Assert.assertEquals((long)11L, (long)blocksForVolume1.getNumberOfBlocks());
    }

    @Test
    public void testReplicatingAfterRemoveVolume() throws InterruptedException, TimeoutException, IOException, ReconfigurationException {
        this.startDFSCluster(1, 2);
        DistributedFileSystem fs = this.cluster.getFileSystem();
        int replFactor = 2;
        Path testFile = new Path("/test");
        this.createFile(testFile, 4, (short)2);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Collection<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
        String newDirs = oldDirs.iterator().next();
        dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs);
        TestDataNodeHotSwapVolumes.assertFileLocksReleased(new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
        dn.scheduleAllBlockReport(0L);
        DataNodeTestUtils.triggerDeletionReport(dn);
        TestDataNodeHotSwapVolumes.waitReplication((FileSystem)fs, testFile, 1, 1);
        DFSTestUtil.waitReplication((FileSystem)fs, testFile, (short)2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private static void assertFileLocksReleased(Collection<String> dirs) throws IOException {
        for (String dir : dirs) {
            FileLock lock;
            FileChannel channel;
            RandomAccessFile raf;
            File lockFile;
            block12: {
                StorageLocation sl = StorageLocation.parse((String)dir);
                lockFile = new File(sl.getFile(), "in_use.lock");
                raf = null;
                channel = null;
                lock = null;
                raf = new RandomAccessFile(lockFile, "rws");
                channel = raf.getChannel();
                lock = channel.tryLock();
                Assert.assertNotNull((String)String.format("Lock file at %s appears to be held by a different process.", lockFile.getAbsolutePath()), (Object)lock);
                if (lock == null) break block12;
                try {
                    lock.release();
                }
                catch (IOException e) {
                    LOG.warn((Object)String.format("I/O error releasing file lock %s.", lockFile.getAbsolutePath()), (Throwable)e);
                }
            }
            IOUtils.cleanup(null, (Closeable[])new Closeable[]{channel, raf});
            continue;
            catch (OverlappingFileLockException e) {
                block13: {
                    try {
                        Assert.fail((String)String.format("Must release lock file at %s.", lockFile.getAbsolutePath()));
                        if (lock == null) break block13;
                    }
                    catch (Throwable throwable) {
                        if (lock != null) {
                            try {
                                lock.release();
                            }
                            catch (IOException e2) {
                                LOG.warn((Object)String.format("I/O error releasing file lock %s.", lockFile.getAbsolutePath()), (Throwable)e2);
                            }
                        }
                        IOUtils.cleanup(null, (Closeable[])new Closeable[]{channel, raf});
                        throw throwable;
                    }
                    try {
                        lock.release();
                    }
                    catch (IOException e3) {
                        LOG.warn((Object)String.format("I/O error releasing file lock %s.", lockFile.getAbsolutePath()), (Throwable)e3);
                    }
                }
                IOUtils.cleanup(null, (Closeable[])new Closeable[]{channel, raf});
            }
        }
    }
}

