/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CacheStats;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import org.apache.log4j.Appender;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

@NotThreadSafe
public class TestFsDatasetCache {
    private static final Logger LOG = LoggerFactory.getLogger(TestFsDatasetCache.class);
    public static final long CACHE_CAPACITY = 65536L;
    private static final long PAGE_SIZE;
    private static final long BLOCK_SIZE;
    private static Configuration conf;
    private static MiniDFSCluster cluster;
    private static FileSystem fs;
    private static NameNode nn;
    private static FSImage fsImage;
    private static DataNode dn;
    private static FsDatasetSpi<?> fsd;
    private static DatanodeProtocolClientSideTranslatorPB spyNN;
    private static ReadWriteLock lock;
    private static final CacheStats.PageRounder rounder;
    private static NativeIO.POSIX.CacheManipulator prevCacheManipulator;
    private static DataNodeFaultInjector oldInjector;

    @BeforeClass
    public static void setUpClass() throws Exception {
        oldInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set((DataNodeFaultInjector)new DataNodeFaultInjector(){

            public void startOfferService() throws Exception {
                lock.readLock().lock();
            }

            public void endOfferService() throws Exception {
                lock.readLock().unlock();
            }
        });
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        DataNodeFaultInjector.set((DataNodeFaultInjector)oldInjector);
    }

    @Before
    public void setUp() throws Exception {
        conf = new HdfsConfiguration();
        conf.setLong("dfs.namenode.path.based.cache.refresh.interval.ms", 100L);
        conf.setLong("dfs.cachereport.intervalMsec", 500L);
        conf.setLong("dfs.blocksize", BLOCK_SIZE);
        conf.setLong("dfs.datanode.max.locked.memory", 65536L);
        conf.setLong("dfs.heartbeat.interval", 1L);
        conf.setInt("dfs.datanode.fsdatasetcache.max.threads.per.volume", 10);
        prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)new NativeIO.POSIX.NoMlockCacheManipulator());
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        nn = cluster.getNameNode();
        fsImage = nn.getFSImage();
        dn = cluster.getDataNodes().get(0);
        fsd = dn.getFSDataset();
        spyNN = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn);
    }

    @After
    public void tearDown() throws Exception {
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        if (fs != null) {
            fs.close();
            fs = null;
        }
        if (cluster != null) {
            cluster.shutdown();
            cluster = null;
        }
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)prevCacheManipulator);
    }

    private static void setHeartbeatResponse(DatanodeCommand[] cmds) throws Exception {
        lock.writeLock().lock();
        try {
            NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, fsImage.getLastAppliedOrWrittenTxId());
            HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null, ThreadLocalRandom.current().nextLong() | 1L);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doReturn((Object)response).when((Object)spyNN)).sendHeartbeat((DatanodeRegistration)ArgumentMatchers.any(), (StorageReport[])ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (VolumeFailureSummary)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (SlowPeerReports)ArgumentMatchers.any(SlowPeerReports.class), (SlowDiskReports)ArgumentMatchers.any(SlowDiskReports.class));
        }
        finally {
            lock.writeLock().unlock();
        }
    }

    private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
        return TestFsDatasetCache.cacheBlocks(new HdfsBlockLocation[]{loc});
    }

    private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
        return new DatanodeCommand[]{TestFsDatasetCache.getResponse(locs, 9)};
    }

    private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
        return TestFsDatasetCache.uncacheBlocks(new HdfsBlockLocation[]{loc});
    }

    private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
        return new DatanodeCommand[]{TestFsDatasetCache.getResponse(locs, 10)};
    }

    private static DatanodeCommand getResponse(HdfsBlockLocation[] locs, int action) {
        String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
        long[] blocks = new long[locs.length];
        for (int i = 0; i < locs.length; ++i) {
            blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId();
        }
        return new BlockIdCommand(action, bpid, blocks);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long[] getBlockSizes(HdfsBlockLocation[] locs) throws Exception {
        long[] sizes = new long[locs.length];
        for (int i = 0; i < locs.length; ++i) {
            HdfsBlockLocation loc = locs[i];
            String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
            Block block = loc.getLocatedBlock().getBlock().getLocalBlock();
            ExtendedBlock extBlock = new ExtendedBlock(bpid, block);
            FileInputStream blockInputStream = null;
            FileChannel blockChannel = null;
            try {
                blockInputStream = (FileInputStream)fsd.getBlockInputStream(extBlock, 0L);
                blockChannel = blockInputStream.getChannel();
                sizes[i] = blockChannel.size();
            }
            catch (Throwable throwable) {
                IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{blockChannel, blockInputStream});
                throw throwable;
            }
            IOUtils.cleanupWithLogger((Logger)LOG, (Closeable[])new Closeable[]{blockChannel, blockInputStream});
        }
        return sizes;
    }

    private void testCacheAndUncacheBlock() throws Exception {
        long cmds;
        MetricsRecordBuilder dnMetrics;
        int i;
        LOG.info("beginning testCacheAndUncacheBlock");
        int NUM_BLOCKS = 5;
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        Assert.assertEquals((long)0L, (long)fsd.getNumBlocksCached());
        Path testFile = new Path("/testCacheBlock");
        long testFileLen = BLOCK_SIZE * 5L;
        DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 43962L);
        HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0L, testFileLen);
        Assert.assertEquals((String)"Unexpected number of blocks", (long)5L, (long)locs.length);
        long[] blockSizes = TestFsDatasetCache.getBlockSizes(locs);
        long cacheCapacity = fsd.getCacheCapacity();
        long cacheUsed = fsd.getCacheUsed();
        long current = 0L;
        Assert.assertEquals((String)"Unexpected cache capacity", (long)65536L, (long)cacheCapacity);
        Assert.assertEquals((String)"Unexpected amount of cache used", (long)current, (long)cacheUsed);
        long numCacheCommands = 0L;
        long numUncacheCommands = 0L;
        for (i = 0; i < 5; ++i) {
            TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.cacheBlock(locs[i]));
            current = DFSTestUtil.verifyExpectedCacheUsage(current + blockSizes[i], i + 1, fsd);
            dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
            cmds = MetricsAsserts.getLongCounter((String)"BlocksCached", (MetricsRecordBuilder)dnMetrics);
            Assert.assertTrue((String)("Expected more cache requests from the NN (" + cmds + " <= " + numCacheCommands + ")"), (cmds > numCacheCommands ? 1 : 0) != 0);
            numCacheCommands = cmds;
        }
        for (i = 0; i < 5; ++i) {
            TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.uncacheBlock(locs[i]));
            current = DFSTestUtil.verifyExpectedCacheUsage(current - blockSizes[i], 4 - i, fsd);
            dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
            cmds = MetricsAsserts.getLongCounter((String)"BlocksUncached", (MetricsRecordBuilder)dnMetrics);
            Assert.assertTrue((String)"Expected more uncache requests from the NN", (cmds > numUncacheCommands ? 1 : 0) != 0);
            numUncacheCommands = cmds;
        }
        LOG.info("finishing testCacheAndUncacheBlock");
    }

    @Test(timeout=600000L)
    public void testCacheAndUncacheBlockSimple() throws Exception {
        this.testCacheAndUncacheBlock();
    }

    @Test(timeout=600000L)
    public void testCacheAndUncacheBlockWithRetries() throws Exception {
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)new NativeIO.POSIX.NoMlockCacheManipulator(){
            private final Set<String> seenIdentifiers = new HashSet<String>();

            public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException {
                if (this.seenIdentifiers.contains(identifier)) {
                    LOG.info("mlocking " + identifier);
                    return;
                }
                this.seenIdentifiers.add(identifier);
                throw new IOException("injecting IOException during mlock of " + identifier);
            }
        });
        this.testCacheAndUncacheBlock();
    }

    @Test(timeout=600000L)
    public void testFilesExceedMaxLockedMemory() throws Exception {
        LOG.info("beginning testFilesExceedMaxLockedMemory");
        int numFiles = 5;
        long fileSize = 16384L;
        Path[] testFiles = new Path[5];
        HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[5][];
        long[] fileSizes = new long[5];
        for (int i = 0; i < 5; ++i) {
            testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
            DFSTestUtil.createFile(fs, testFiles[i], 16384L, (short)1, 3578L);
            fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(testFiles[i], 0L, 16384L);
            long[] sizes = TestFsDatasetCache.getBlockSizes(fileLocs[i]);
            for (int j = 0; j < sizes.length; ++j) {
                int n = i;
                fileSizes[n] = fileSizes[n] + sizes[j];
            }
        }
        long total = 0L;
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        for (int i = 0; i < 4; ++i) {
            TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.cacheBlocks(fileLocs[i]));
            total = DFSTestUtil.verifyExpectedCacheUsage(rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd);
        }
        final LogVerificationAppender appender = new LogVerificationAppender();
        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getRootLogger();
        logger.addAppender((Appender)appender);
        TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.cacheBlocks(fileLocs[4]));
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                int lines = appender.countLinesWithMessage("could not reserve more bytes in the cache: ");
                return lines > 0;
            }
        }, (long)500L, (long)30000L);
        Assert.assertTrue((String)"Expected more than 0 failed cache attempts", (fsd.getNumBlocksFailedToCache() > 0L ? 1 : 0) != 0);
        int curCachedBlocks = 16;
        for (int i = 0; i < 4; ++i) {
            TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.uncacheBlocks(fileLocs[i]));
            long uncachedBytes = rounder.roundUp(fileSizes[i]);
            curCachedBlocks = (int)((long)curCachedBlocks - uncachedBytes / BLOCK_SIZE);
            DFSTestUtil.verifyExpectedCacheUsage(total -= uncachedBytes, curCachedBlocks, fsd);
        }
        LOG.info("finishing testFilesExceedMaxLockedMemory");
    }

    @Test(timeout=600000L)
    public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
        LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
        int NUM_BLOCKS = 5;
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        Path testFile = new Path("/testCacheBlock");
        long testFileLen = BLOCK_SIZE * 5L;
        DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 43962L);
        HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0L, testFileLen);
        Assert.assertEquals((String)"Unexpected number of blocks", (long)5L, (long)locs.length);
        long[] blockSizes = TestFsDatasetCache.getBlockSizes(locs);
        long cacheCapacity = fsd.getCacheCapacity();
        long cacheUsed = fsd.getCacheUsed();
        long current = 0L;
        Assert.assertEquals((String)"Unexpected cache capacity", (long)65536L, (long)cacheCapacity);
        Assert.assertEquals((String)"Unexpected amount of cache used", (long)current, (long)cacheUsed);
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)new NativeIO.POSIX.NoMlockCacheManipulator(){

            public void mlock(String identifier, ByteBuffer mmap, long length) throws IOException {
                LOG.info("An mlock operation is starting on " + identifier);
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException e) {
                    Assert.fail();
                }
            }
        });
        for (int i = 0; i < 5; ++i) {
            TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.cacheBlock(locs[i]));
            current = DFSTestUtil.verifyExpectedCacheUsage(current + blockSizes[i], i + 1, fsd);
        }
        TestFsDatasetCache.setHeartbeatResponse(new DatanodeCommand[]{TestFsDatasetCache.getResponse(locs, 10)});
        current = DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
        LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
    }

    @Test(timeout=60000L)
    public void testUncacheUnknownBlock() throws Exception {
        Path fileName = new Path("/testUncacheUnknownBlock");
        int fileLen = 4096;
        DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 65021L);
        HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(fileName, 0L, (long)fileLen);
        TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.uncacheBlocks(locs));
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                return fsd.getNumBlocksFailedToUncache() > 0L;
            }
        }, (long)100L, (long)10000L);
    }

    @Test(timeout=600000L)
    public void testPageRounder() throws Exception {
        Path fileName = new Path("/testPageRounder");
        int smallBlocks = 512;
        Assert.assertTrue((String)"Page size should be greater than smallBlocks!", (PAGE_SIZE > 512L ? 1 : 0) != 0);
        int numBlocks = 5;
        int fileLen = 2560;
        FSDataOutputStream out = fs.create(fileName, false, 4096, (short)1, 512L);
        out.write(new byte[2560]);
        out.close();
        HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(fileName, 0L, 2560L);
        TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.cacheBlocks(locs));
        DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * 5L, 5L, fsd);
        TestFsDatasetCache.setHeartbeatResponse(TestFsDatasetCache.uncacheBlocks(locs));
        DFSTestUtil.verifyExpectedCacheUsage(0L, 0L, fsd);
    }

    @Test(timeout=60000L)
    public void testUncacheQuiesces() throws Exception {
        Path fileName = new Path("/testUncacheQuiesces");
        int fileLen = 4096;
        DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 65021L);
        DistributedFileSystem dfs = cluster.getFileSystem();
        dfs.addCachePool(new CachePoolInfo("pool"));
        dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(fileName).setReplication(Short.valueOf((short)3)).build());
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
                long blocksCached = MetricsAsserts.getLongCounter((String)"BlocksCached", (MetricsRecordBuilder)dnMetrics);
                return blocksCached > 0L;
            }
        }, (long)1000L, (long)30000L);
        dfs.removeCacheDirective(1L);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
                long blocksUncached = MetricsAsserts.getLongCounter((String)"BlocksUncached", (MetricsRecordBuilder)dnMetrics);
                return blocksUncached > 0L;
            }
        }, (long)1000L, (long)30000L);
        Thread.sleep(10000L);
        MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
        MetricsAsserts.assertCounter((String)"BlocksCached", (long)1L, (MetricsRecordBuilder)dnMetrics);
        MetricsAsserts.assertCounter((String)"BlocksUncached", (long)1L, (MetricsRecordBuilder)dnMetrics);
    }

    @Test(timeout=60000L)
    public void testReCacheAfterUncache() throws Exception {
        final int TOTAL_BLOCKS_PER_CACHE = Ints.checkedCast((long)(65536L / BLOCK_SIZE));
        BlockReaderTestUtil.enableHdfsCachingTracing();
        Assert.assertEquals((long)0L, (long)(65536L % BLOCK_SIZE));
        Path SMALL_FILE = new Path("/smallFile");
        DFSTestUtil.createFile(fs, SMALL_FILE, BLOCK_SIZE, (short)1, 51966L);
        Path BIG_FILE = new Path("/bigFile");
        DFSTestUtil.createFile(fs, BIG_FILE, (long)TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 48879L);
        final DistributedFileSystem dfs = cluster.getFileSystem();
        dfs.addCachePool(new CachePoolInfo("pool"));
        long bigCacheDirectiveId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(BIG_FILE).setReplication(Short.valueOf((short)1)).build());
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
                long blocksCached = MetricsAsserts.getLongCounter((String)"BlocksCached", (MetricsRecordBuilder)dnMetrics);
                if (blocksCached != (long)TOTAL_BLOCKS_PER_CACHE) {
                    LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to be cached.   Right now only " + blocksCached + " blocks are cached.");
                    return false;
                }
                LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached.");
                return true;
            }
        }, (long)1000L, (long)30000L);
        final long shortCacheDirectiveId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool").setPath(SMALL_FILE).setReplication(Short.valueOf((short)1)).build());
        Thread.sleep(10000L);
        MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
        Assert.assertEquals((long)TOTAL_BLOCKS_PER_CACHE, (long)MetricsAsserts.getLongCounter((String)"BlocksCached", (MetricsRecordBuilder)dnMetrics));
        dfs.removeCacheDirective(bigCacheDirectiveId);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    CacheDirectiveEntry entry;
                    RemoteIterator iter = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().build());
                    while ((entry = (CacheDirectiveEntry)iter.next()).getInfo().getId() != shortCacheDirectiveId) {
                    }
                    if (entry.getStats().getFilesCached() != 1L) {
                        LOG.info("waiting for directive " + shortCacheDirectiveId + " to be cached.  stats = " + entry.getStats());
                        return false;
                    }
                    LOG.info("directive " + shortCacheDirectiveId + " has been cached.");
                }
                catch (IOException e) {
                    Assert.fail((String)("unexpected exception" + e.toString()));
                }
                return true;
            }
        }, (long)1000L, (long)30000L);
        dfs.removeCacheDirective(shortCacheDirectiveId);
    }

    static {
        BLOCK_SIZE = PAGE_SIZE = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
        cluster = null;
        lock = new ReentrantReadWriteLock(true);
        rounder = new CacheStats.PageRounder();
        GenericTestUtils.setLogLevel((Logger)LoggerFactory.getLogger(FsDatasetCache.class), (Level)Level.DEBUG);
    }
}

