package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.class */
public class TestAbfsInputStream extends AbstractAbfsIntegrationTest {
    private static final int ONE_KB = 1024;
    private static final int TWO_KB = 2048;
    private static final int THREE_KB = 3072;
    private static final int SIXTEEN_KB = 16384;
    private static final int FORTY_EIGHT_KB = 49152;
    private static final int ONE_MB = 1048576;
    private static final int FOUR_MB = 4194304;
    private static final int EIGHT_MB = 8388608;
    private static final int TEST_READAHEAD_DEPTH_2 = 2;
    private static final int TEST_READAHEAD_DEPTH_4 = 4;
    private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000;
    private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD = 30000;
    private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16777216;

    @Override // org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest
    public void teardown() throws Exception {
        super.teardown();
        ReadBufferManager.getBufferManager().testResetReadBufferManager();
    }

    private AbfsRestOperation getMockRestOp() {
        AbfsRestOperation abfsRestOperation = (AbfsRestOperation) Mockito.mock(AbfsRestOperation.class);
        AbfsHttpOperation abfsHttpOperation = (AbfsHttpOperation) Mockito.mock(AbfsHttpOperation.class);
        Mockito.when(Long.valueOf(abfsHttpOperation.getBytesReceived())).thenReturn(1024L);
        Mockito.when(abfsRestOperation.getResult()).thenReturn(abfsHttpOperation);
        Mockito.when(abfsRestOperation.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get());
        return abfsRestOperation;
    }

    private AbfsClient getMockAbfsClient() {
        AbfsClient abfsClient = (AbfsClient) Mockito.mock(AbfsClient.class);
        Mockito.when(abfsClient.getAbfsPerfTracker()).thenReturn(new AbfsPerfTracker("test", getAccountName(), getConfiguration()));
        return abfsClient;
    }

    private AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, String str) throws IOException {
        AbfsInputStream abfsInputStream = new AbfsInputStream(abfsClient, (FileSystem.Statistics) null, "/" + str, 3072L, new AbfsInputStreamContext(-1L).withReadBufferSize(1024).withReadAheadQueueDepth(10).withReadAheadBlockSize(1024), "eTag", getTestTracingContext(null, false));
        abfsInputStream.setCachedSasToken(TestCachedSASToken.getTestCachedSASTokenInstance());
        return abfsInputStream;
    }

    public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, String str, int i, String str2, int i2, int i3, boolean z, int i4) throws IOException {
        AbfsInputStream abfsInputStream = new AbfsInputStream(abfsClient, (FileSystem.Statistics) null, "/" + str, i, new AbfsInputStreamContext(-1L).withReadBufferSize(i3).withReadAheadQueueDepth(i2).withShouldReadBufferSizeAlways(z).withReadAheadBlockSize(i4), str2, getTestTracingContext(getFileSystem(), false));
        abfsInputStream.setCachedSasToken(TestCachedSASToken.getTestCachedSASTokenInstance());
        return abfsInputStream;
    }

    private void queueReadAheads(AbfsInputStream abfsInputStream) {
        ReadBufferManager.getBufferManager().queueReadAhead(abfsInputStream, 0L, 1024, abfsInputStream.getTracingContext());
        ReadBufferManager.getBufferManager().queueReadAhead(abfsInputStream, 1024L, 1024, abfsInputStream.getTracingContext());
        ReadBufferManager.getBufferManager().queueReadAhead(abfsInputStream, 2048L, TWO_KB, abfsInputStream.getTracingContext());
    }

    private void verifyReadCallCount(AbfsClient abfsClient, int i) throws AzureBlobFileSystemException, InterruptedException {
        Thread.sleep(1000L);
        ((AbfsClient) Mockito.verify(abfsClient, Mockito.times(i))).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
    }

    private void checkEvictedStatus(AbfsInputStream abfsInputStream, int i, boolean z) throws Exception {
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
        for (int completedReadListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize(); completedReadListSize > 0; completedReadListSize--) {
            ReadBufferManager.getBufferManager().callTryEvict();
        }
        if (z) {
            LambdaTestUtils.intercept(IOException.class, () -> {
                return Integer.valueOf(abfsInputStream.read(i, new byte[1024], 0, 1024));
            });
        } else {
            abfsInputStream.read(i, new byte[1024], 0, 1024);
        }
    }

    public TestAbfsInputStream() throws Exception {
        ReadBufferManager.getBufferManager();
        ReadBufferManager.setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
    }

    private void writeBufferToNewFile(Path path, byte[] bArr) throws IOException {
        AzureBlobFileSystem fileSystem = getFileSystem();
        fileSystem.create(path);
        FSDataOutputStream append = fileSystem.append(path);
        append.write(bArr);
        append.close();
    }

    private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, byte[] bArr, AbfsRestOperationType abfsRestOperationType) throws IOException, ExecutionException, InterruptedException {
        byte[] bArr2 = new byte[bArr.length];
        getFileSystem().openFile(path).withFileStatus(fileStatus);
        assertEquals(String.format("Open with fileStatus [from %s result]: Incorrect number of bytes read", abfsRestOperationType), bArr.length, ((FSDataInputStream) r0.build().get()).read(bArr2));
        assertArrayEquals(String.format("Open with fileStatus [from %s result]: Incorrect read data", abfsRestOperationType), bArr2, bArr);
    }

    private void checkGetPathStatusCalls(Path path, FileStatus fileStatus, AzureBlobFileSystemStore azureBlobFileSystemStore, AbfsClient abfsClient, AbfsRestOperationType abfsRestOperationType, TracingContext tracingContext) throws IOException {
        azureBlobFileSystemStore.openFileForRead(path, Optional.ofNullable(new OpenFileParameters().withStatus(fileStatus)), (FileSystem.Statistics) null, tracingContext);
        ((AbfsClient) Mockito.verify(abfsClient, Mockito.times(0).description(String.format("FileStatus [from %s result] provided, GetFileStatus should not be invoked", abfsRestOperationType)))).getPathStatus(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        azureBlobFileSystemStore.openFileForRead(path, Optional.empty(), (FileSystem.Statistics) null, tracingContext);
        ((AbfsClient) Mockito.verify(abfsClient, Mockito.times(1).description("GetPathStatus should be invoked when FileStatus not provided"))).getPathStatus(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        Mockito.reset(new AbfsClient[]{abfsClient});
    }

    @Test
    public void testOpenFileWithOptions() throws Exception {
        AzureBlobFileSystem fileSystem = getFileSystem();
        Path path = new Path("/testFolder/testFile0");
        Path path2 = new Path("/testFolder/testFile1");
        fileSystem.mkdirs(new Path("/testFolder"));
        byte[] bArr = new byte[5];
        byte[] bArr2 = new byte[getConfiguration().getReadBufferSize() + 5];
        new Random().nextBytes(bArr);
        new Random().nextBytes(bArr2);
        writeBufferToNewFile(path, bArr);
        writeBufferToNewFile(path2, bArr2);
        FileStatus[] fileStatusArr = {fileSystem.getFileStatus(path), fileSystem.getFileStatus(path2)};
        FileStatus[] listStatus = fileSystem.listStatus(new Path("/testFolder"));
        verifyOpenWithProvidedStatus(path, fileStatusArr[0], bArr, AbfsRestOperationType.GetPathStatus);
        verifyOpenWithProvidedStatus(path2, fileStatusArr[1], bArr2, AbfsRestOperationType.GetPathStatus);
        verifyOpenWithProvidedStatus(path, listStatus[0], bArr, AbfsRestOperationType.ListPaths);
        verifyOpenWithProvidedStatus(path2, listStatus[1], bArr2, AbfsRestOperationType.ListPaths);
        AzureBlobFileSystemStore abfsStore = getAbfsStore(fileSystem);
        AbfsClient abfsClient = (AbfsClient) Mockito.spy(getAbfsClient(abfsStore));
        setAbfsClient(abfsStore, abfsClient);
        TracingContext testTracingContext = getTestTracingContext(fileSystem, false);
        checkGetPathStatusCalls(path, fileStatusArr[0], abfsStore, abfsClient, AbfsRestOperationType.GetPathStatus, testTracingContext);
        checkGetPathStatusCalls(path2, fileStatusArr[1], abfsStore, abfsClient, AbfsRestOperationType.GetPathStatus, testTracingContext);
        checkGetPathStatusCalls(path, listStatus[0], abfsStore, abfsClient, AbfsRestOperationType.ListPaths, testTracingContext);
        checkGetPathStatusCalls(path2, listStatus[1], abfsStore, abfsClient, AbfsRestOperationType.ListPaths, testTracingContext);
        fileStatusArr[0].setPath(new Path("wrongPath"));
        LambdaTestUtils.intercept(ExecutionException.class, () -> {
            verifyOpenWithProvidedStatus(path, fileStatusArr[0], bArr, AbfsRestOperationType.GetPathStatus);
        });
    }

    @Test
    public void testFailedReadAhead() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        ((AbfsClient) Mockito.doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error RAH-Thread-Z")}).doReturn(getMockRestOp()).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testFailedReadAhead.txt");
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Integer.valueOf(abfsInputStream.read(new byte[1024]));
        });
        verifyReadCallCount(mockAbfsClient, 3);
        checkEvictedStatus(abfsInputStream, 0, false);
    }

    @Test
    public void testFailedReadAheadEviction() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        getMockRestOp();
        ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
        ((AbfsClient) Mockito.doThrow(new Throwable[]{new TimeoutException("Internal Server error")}).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testFailedReadAheadEviction.txt");
        ReadBuffer readBuffer = new ReadBuffer();
        readBuffer.setStatus(ReadBufferStatus.READ_FAILED);
        ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(readBuffer);
        ReadBufferManager.getBufferManager().queueReadAhead(abfsInputStream, 0L, 1024, getTestTracingContext(getFileSystem(), true));
    }

    @Test
    public void testOlderReadAheadFailure() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        AbfsRestOperation mockRestOp = getMockRestOp();
        ((AbfsClient) Mockito.doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).doReturn(mockRestOp).doReturn(mockRestOp).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testOlderReadAheadFailure.txt");
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Integer.valueOf(abfsInputStream.read(new byte[1024]));
        });
        verifyReadCallCount(mockAbfsClient, 3);
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
        abfsInputStream.read(1024L, new byte[1024], 0, 1024);
        verifyReadCallCount(mockAbfsClient, TEST_READAHEAD_DEPTH_4);
        checkEvictedStatus(abfsInputStream, 0, false);
    }

    @Test
    public void testSuccessfulReadAhead() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        AbfsRestOperation mockRestOp = getMockRestOp();
        ((AbfsClient) Mockito.doReturn(mockRestOp).doReturn(mockRestOp).doReturn(mockRestOp).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testSuccessfulReadAhead.txt");
        int completedReadListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize();
        abfsInputStream.read(new byte[1024]);
        verifyReadCallCount(mockAbfsClient, 3);
        Assertions.assertThat(ReadBufferManager.getBufferManager().getCompletedReadListSize() - completedReadListSize).describedAs("New additions to completed reads should be same or less than as number of readaheads", new Object[0]).isLessThanOrEqualTo(3);
        abfsInputStream.read(1024L, new byte[1024], 0, 1024);
        verifyReadCallCount(mockAbfsClient, 3);
        checkEvictedStatus(abfsInputStream, 0, true);
    }

    @Test
    public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        AbfsRestOperation mockRestOp = getMockRestOp();
        long j = 3000L;
        Long l = 1000L;
        Integer num = 3;
        ((AbfsClient) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(j.longValue());
            return mockRestOp;
        }).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
        int numBuffers = bufferManager.getNumBuffers() - num.intValue();
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testSuccessfulReadAhead.txt");
        try {
            queueReadAheads(abfsInputStream);
            Thread.sleep(l.longValue());
            Assertions.assertThat(bufferManager.getInProgressCopiedList()).describedAs(String.format("InProgressList should have %d elements", num), new Object[0]).hasSize(num.intValue());
            Assertions.assertThat(bufferManager.getFreeListCopy()).describedAs(String.format("FreeList should have %d elements", Integer.valueOf(numBuffers)), new Object[0]).hasSize(numBuffers);
            Assertions.assertThat(bufferManager.getCompletedReadListCopy()).describedAs("CompletedList should have 0 elements", new Object[0]).hasSize(0);
            if (abfsInputStream != null) {
                abfsInputStream.close();
            }
            Assertions.assertThat(bufferManager.getInProgressCopiedList()).describedAs(String.format("InProgressList should have %d elements", num), new Object[0]).hasSize(num.intValue());
            Assertions.assertThat(bufferManager.getFreeListCopy()).describedAs(String.format("FreeList should have %d elements", Integer.valueOf(numBuffers)), new Object[0]).hasSize(numBuffers);
            Assertions.assertThat(bufferManager.getCompletedReadListCopy()).describedAs("CompletedList should have 0 elements", new Object[0]).hasSize(0);
        } catch (Throwable th) {
            if (abfsInputStream != null) {
                try {
                    abfsInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testReadAheadManagerForFailedReadAhead() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        ((AbfsClient) Mockito.doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error RAH-Thread-Z")}).doReturn(getMockRestOp()).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testReadAheadManagerForFailedReadAhead.txt");
        queueReadAheads(abfsInputStream);
        Thread.sleep(1000L);
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Integer.valueOf(ReadBufferManager.getBufferManager().getBlock(abfsInputStream, 0L, 1024, new byte[1024]));
        });
        verifyReadCallCount(mockAbfsClient, 3);
        checkEvictedStatus(abfsInputStream, 0, false);
    }

    @Test
    public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        AbfsRestOperation mockRestOp = getMockRestOp();
        ((AbfsClient) Mockito.doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doReturn(mockRestOp).doReturn(mockRestOp).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testReadAheadManagerForOlderReadAheadFailure.txt");
        queueReadAheads(abfsInputStream);
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
        verifyReadCallCount(mockAbfsClient, 3);
        Assert.assertEquals("bytesRead should be zero when previously read ahead buffer had failed", 0L, ReadBufferManager.getBufferManager().getBlock(abfsInputStream, 1024L, 1024, new byte[1024]));
        checkEvictedStatus(abfsInputStream, 0, false);
    }

    @Test
    public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
        AbfsClient mockAbfsClient = getMockAbfsClient();
        AbfsRestOperation mockRestOp = getMockRestOp();
        ((AbfsClient) Mockito.doReturn(mockRestOp).doReturn(mockRestOp).doReturn(mockRestOp).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).when(mockAbfsClient)).read((String) ArgumentMatchers.any(String.class), ((Long) ArgumentMatchers.any(Long.class)).longValue(), (byte[]) ArgumentMatchers.any(byte[].class), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), ((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (String) ArgumentMatchers.any(String.class), (String) ArgumentMatchers.any(String.class), (TracingContext) ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream abfsInputStream = getAbfsInputStream(mockAbfsClient, "testSuccessfulReadAhead.txt");
        queueReadAheads(abfsInputStream);
        Thread.sleep(1000L);
        verifyReadCallCount(mockAbfsClient, 3);
        Assert.assertTrue("bytesRead should be non-zero from the buffer that was read-ahead", ReadBufferManager.getBufferManager().getBlock(abfsInputStream, 1024L, 1024, new byte[1024]) > 0);
        verifyReadCallCount(mockAbfsClient, 3);
        checkEvictedStatus(abfsInputStream, 0, true);
    }

    @Test
    public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception {
        resetReadBufferManager(FOUR_MB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
        testReadAheadConfigs(FOUR_MB, TEST_READAHEAD_DEPTH_4, false, EIGHT_MB);
        resetReadBufferManager(16384, INCREASED_READ_BUFFER_AGE_THRESHOLD);
        testReadAheads(testReadAheadConfigs(16384, 2, true, 16384), 16384, 16384);
        resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
        testReadAheads(testReadAheadConfigs(16384, 2, true, FORTY_EIGHT_KB), 16384, FORTY_EIGHT_KB);
        resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
        testReadAheads(testReadAheadConfigs(FORTY_EIGHT_KB, 2, true, 16384), FORTY_EIGHT_KB, 16384);
    }

    @Test
    public void testDefaultReadaheadQueueDepth() throws Exception {
        Configuration rawConfiguration = getRawConfiguration();
        rawConfiguration.unset("fs.azure.readaheadqueue.depth");
        AzureBlobFileSystem fileSystem = getFileSystem(rawConfiguration);
        Path path = path("/testFile");
        fileSystem.create(path).close();
        FSDataInputStream open = fileSystem.open(path);
        Assertions.assertThat(open.getWrappedStream().getReadAheadQueueDepth()).describedAs("readahead queue depth should be set to default value 2", new Object[0]).isEqualTo(2);
        open.close();
    }

    private void testReadAheads(AbfsInputStream abfsInputStream, int i, int i2) throws Exception {
        if (i > i2) {
            i2 = i;
        }
        byte[] bArr = new byte[i];
        byte[] bArr2 = new byte[i2];
        byte[] bArr3 = new byte[i];
        byte[] bArr4 = new byte[i2];
        getExpectedBufferData(0, i, bArr3);
        getExpectedBufferData(i, i2, bArr4);
        Assertions.assertThat(abfsInputStream.read(bArr, 0, i)).describedAs("Read should be of exact requested size", new Object[0]).isEqualTo(i);
        assertTrue("Data mismatch found in RAH1", Arrays.equals(bArr, bArr3));
        Assertions.assertThat(abfsInputStream.read(bArr2, 0, i2)).describedAs("Read should be of exact requested size", new Object[0]).isEqualTo(i2);
        assertTrue("Data mismatch found in RAH2", Arrays.equals(bArr2, bArr4));
    }

    public AbfsInputStream testReadAheadConfigs(int i, int i2, boolean z, int i3) throws Exception {
        Configuration configuration = new Configuration(getRawConfiguration());
        configuration.set("fs.azure.read.request.size", Integer.toString(i));
        configuration.set("fs.azure.readaheadqueue.depth", Integer.toString(i2));
        configuration.set("fs.azure.read.alwaysReadBufferSize", Boolean.toString(z));
        configuration.set("fs.azure.read.readahead.blocksize", Integer.toString(i3));
        if (i > i3) {
            i3 = i;
        }
        Path path = path("/testReadAheadConfigs");
        AzureBlobFileSystem createTestFile = createTestFile(path, 16777216L, configuration);
        byte[] bArr = new byte[1048576];
        AbfsInputStream openFileForRead = getAbfsStore(createTestFile).openFileForRead(path, (FileSystem.Statistics) null, getTestTracingContext(createTestFile, false));
        Assertions.assertThat(openFileForRead.getBufferSize()).describedAs("Unexpected AbfsInputStream buffer size", new Object[0]).isEqualTo(i);
        Assertions.assertThat(openFileForRead.getReadAheadQueueDepth()).describedAs("Unexpected ReadAhead queue depth", new Object[0]).isEqualTo(i2);
        Assertions.assertThat(openFileForRead.shouldAlwaysReadBufferSize()).describedAs("Unexpected AlwaysReadBufferSize settings", new Object[0]).isEqualTo(z);
        Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize()).describedAs("Unexpected readAhead block size", new Object[0]).isEqualTo(i3);
        return openFileForRead;
    }

    private void getExpectedBufferData(int i, int i2, byte[] bArr) {
        boolean z = false;
        int i3 = 0;
        char c = 'a';
        for (int i4 = 0; i4 < i + i2; i4++) {
            if (i4 == i) {
                z = true;
            }
            if (z && i3 < i2) {
                bArr[i3] = (byte) c;
                i3++;
            }
            c = c == 'z' ? 'a' : (char) (c + 1);
        }
    }

    private AzureBlobFileSystem createTestFile(Path path, long j, Configuration configuration) throws Exception {
        AzureBlobFileSystem fileSystem = configuration == null ? getFileSystem() : (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), configuration);
        if (fileSystem.exists(path) && fileSystem.getFileStatus(path).getLen() >= j) {
            return fileSystem;
        }
        byte[] bArr = new byte[EIGHT_MB];
        char c = 'a';
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = (byte) c;
            c = c == 'z' ? 'a' : (char) (c + 1);
        }
        FSDataOutputStream create = fileSystem.create(path);
        for (int i2 = 0; i2 < j; i2 += bArr.length) {
            try {
                create.write(bArr);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (create != null) {
            create.close();
        }
        Assertions.assertThat(fileSystem.getFileStatus(path).getLen()).describedAs("File not created of expected size", new Object[0]).isEqualTo(j);
        return fileSystem;
    }

    private void resetReadBufferManager(int i, int i2) {
        ReadBufferManager.getBufferManager().testResetReadBufferManager(i, i2);
        System.gc();
    }
}
