package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.class */
public class TestDFSInputStreamBlockLocations {
    private static final int BLOCK_SIZE = 1048576;
    private static final String[] RACKS = {"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
    private static final int NUM_DATA_NODES = RACKS.length;
    private static final short REPLICATION_FACTOR = 4;
    private final int staleInterval = 8000;
    private final int numOfBlocks = 24;
    private final int fileLength = 25165824;
    private final int dfsClientPrefetchSize = 12582912;
    private final long dfsInputLocationsTimeout = 3600000;
    private HdfsConfiguration conf;
    private MiniDFSCluster dfsCluster;
    private DFSClient dfsClient;
    private DistributedFileSystem fs;
    private Path filePath;
    private boolean enableBlkExpiration;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations$ThrowingConsumer.class */
    public interface ThrowingConsumer {
        void accept(DFSInputStream dFSInputStream) throws IOException;
    }

    @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE});
    }

    public TestDFSInputStreamBlockLocations(Boolean bool) {
        this.enableBlkExpiration = bool.booleanValue();
    }

    @Before
    public void setup() throws IOException {
        this.conf = new HdfsConfiguration();
        this.conf.setBoolean("dfs.namenode.avoid.read.stale.datanode", true);
        this.conf.setLong("dfs.namenode.stale.datanode.interval", 8000L);
        this.conf.setInt("dfs.namenode.heartbeat.recheck-interval", 4000);
        this.conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
        this.conf.setInt("dfs.replication", REPLICATION_FACTOR);
        this.conf.setLong("dfs.blocksize", DiskBalancerTestUtil.MB);
        this.conf.setLong("dfs.client.read.prefetch.size", 12582912L);
        if (this.enableBlkExpiration) {
            this.conf.setLong("dfs.client.refresh.read-block-locations.ms", 3600000L);
        }
        this.dfsCluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
        this.dfsCluster.waitActive();
        Assert.assertEquals(NUM_DATA_NODES, this.dfsCluster.getDataNodes().size());
        this.dfsClient = new DFSClient(new InetSocketAddress("localhost", this.dfsCluster.getNameNodePort()), this.conf);
        this.fs = this.dfsCluster.getFileSystem();
    }

    @After
    public void teardown() throws IOException {
        if (this.dfsClient != null) {
            this.dfsClient.close();
            this.dfsClient = null;
        }
        if (this.fs != null) {
            this.fs.deleteOnExit(this.filePath);
            this.fs.close();
            this.fs = null;
        }
        if (this.dfsCluster != null) {
            this.dfsCluster.shutdown();
            this.dfsCluster = null;
        }
    }

    @Test
    public void testRefreshBlockLocations() throws IOException {
        this.filePath = createFile("/test_cache_locations");
        DFSInputStream open = this.dfsClient.open("/test_cache_locations");
        try {
            LocatedBlocks locatedBlocks = open.locatedBlocks;
            long lastRefreshedBlocksAtForTesting = open.getLastRefreshedBlocksAtForTesting();
            Assert.assertFalse("should not have attempted refresh", open.refreshBlockLocations((Map) null));
            Assert.assertEquals("should not have updated lastRefreshedAt", lastRefreshedBlocksAtForTesting, open.getLastRefreshedBlocksAtForTesting());
            Assert.assertSame("should not have modified locatedBlocks", locatedBlocks, open.locatedBlocks);
            open.addToLocalDeadNodes(this.dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)[0]);
            Assert.assertTrue("should have attempted refresh", open.refreshBlockLocations((Map) null));
            verifyChanged(open, locatedBlocks, lastRefreshedBlocksAtForTesting);
            long lastRefreshedBlocksAtForTesting2 = open.getLastRefreshedBlocksAtForTesting();
            LocatedBlocks locatedBlocks2 = open.locatedBlocks;
            HashMap hashMap = new HashMap();
            InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("www.google.com", 80);
            Iterator<DataNode> it = this.dfsCluster.getDataNodes().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next().getDatanodeUuid(), createUnresolved);
            }
            Assert.assertTrue("should have attempted refresh", open.refreshBlockLocations(hashMap));
            verifyChanged(open, locatedBlocks2, lastRefreshedBlocksAtForTesting2);
            if (open != null) {
                open.close();
            }
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void verifyChanged(DFSInputStream dFSInputStream, LocatedBlocks locatedBlocks, long j) {
        Assert.assertTrue("lastRefreshedAt should have incremented", dFSInputStream.getLastRefreshedBlocksAtForTesting() > j);
        Assert.assertNotSame("located blocks should have changed", locatedBlocks, dFSInputStream.locatedBlocks);
        Assert.assertTrue("deadNodes should be empty", dFSInputStream.getLocalDeadNodes().isEmpty());
    }

    @Test
    public void testDeferredRegistrationStatefulRead() throws IOException {
        testWithRegistrationMethod((v0) -> {
            v0.read();
        });
    }

    @Test
    public void testDeferredRegistrationPositionalRead() throws IOException {
        testWithRegistrationMethod(dFSInputStream -> {
            dFSInputStream.readFully(0L, new byte[1]);
        });
    }

    @Test
    public void testDeferredRegistrationGetAllBlocks() throws IOException {
        testWithRegistrationMethod((v0) -> {
            v0.getAllBlocks();
        });
    }

    private void testWithRegistrationMethod(ThrowingConsumer throwingConsumer) throws IOException {
        this.filePath = createFile("/test_cache_locations");
        DFSInputStream dFSInputStream = null;
        try {
            dFSInputStream = this.dfsClient.open("/test_cache_locations");
            Assert.assertFalse("should not be tracking input stream on open", this.dfsClient.getLocatedBlockRefresher().isInputStreamTracked(dFSInputStream));
            throwingConsumer.accept(dFSInputStream);
            Assert.assertFalse("should not be tracking input stream after first read", this.dfsClient.getLocatedBlockRefresher().isInputStreamTracked(dFSInputStream));
            dFSInputStream.setLastRefreshedBlocksAtForTesting(Time.monotonicNow() - 3600001);
            throwingConsumer.accept(dFSInputStream);
            Assert.assertEquals("SHOULD be tracking input stream on read after interval, only if enabled", Boolean.valueOf(this.enableBlkExpiration), Boolean.valueOf(this.dfsClient.getLocatedBlockRefresher().isInputStreamTracked(dFSInputStream)));
            if (dFSInputStream != null) {
                dFSInputStream.close();
                Assert.assertFalse(this.dfsClient.getLocatedBlockRefresher().isInputStreamTracked(dFSInputStream));
            }
            this.fs.delete(this.filePath, true);
        } catch (Throwable th) {
            if (dFSInputStream != null) {
                dFSInputStream.close();
                Assert.assertFalse(this.dfsClient.getLocatedBlockRefresher().isInputStreamTracked(dFSInputStream));
            }
            this.fs.delete(this.filePath, true);
            throw th;
        }
    }

    private Path createFile(String str) throws IOException {
        Path path = new Path(str);
        FSDataOutputStream create = this.fs.create(path, (short) 4);
        try {
            create.write(new byte[25165824]);
            if (create != null) {
                create.close();
            }
            return path;
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
