/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType;
import org.apache.hadoop.fs.azurebfs.services.ReadBuffer;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManager;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestAbfsInputStream
extends AbstractAbfsIntegrationTest {
    private static final int ONE_KB = 1024;
    private static final int TWO_KB = 2048;
    private static final int THREE_KB = 3072;
    private static final int SIXTEEN_KB = 16384;
    private static final int FORTY_EIGHT_KB = 49152;
    private static final int ONE_MB = 0x100000;
    private static final int FOUR_MB = 0x400000;
    private static final int EIGHT_MB = 0x800000;
    private static final int TEST_READAHEAD_DEPTH_2 = 2;
    private static final int TEST_READAHEAD_DEPTH_4 = 4;
    private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000;
    private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD = 30000;
    private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 0x1000000;

    @Override
    public void teardown() throws Exception {
        super.teardown();
        ReadBufferManager.getBufferManager().testResetReadBufferManager();
    }

    private AbfsRestOperation getMockRestOp() {
        AbfsRestOperation op = (AbfsRestOperation)Mockito.mock(AbfsRestOperation.class);
        AbfsHttpOperation httpOp = (AbfsHttpOperation)Mockito.mock(AbfsHttpOperation.class);
        Mockito.when((Object)httpOp.getBytesReceived()).thenReturn((Object)1024L);
        Mockito.when((Object)op.getResult()).thenReturn((Object)httpOp);
        Mockito.when((Object)op.getSasToken()).thenReturn((Object)TestCachedSASToken.getTestCachedSASTokenInstance().get());
        return op;
    }

    private AbfsClient getMockAbfsClient() {
        AbfsClient client = (AbfsClient)Mockito.mock(AbfsClient.class);
        AbfsPerfTracker tracker = new AbfsPerfTracker("test", this.getAccountName(), this.getConfiguration());
        Mockito.when((Object)client.getAbfsPerfTracker()).thenReturn((Object)tracker);
        return client;
    }

    private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException {
        AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1L);
        AbfsInputStream inputStream = new AbfsInputStream(mockAbfsClient, null, "/" + fileName, 3072L, inputStreamContext.withReadBufferSize(1024).withReadAheadQueueDepth(10).withReadAheadBlockSize(1024), "eTag", this.getTestTracingContext(null, false));
        inputStream.setCachedSasToken(TestCachedSASToken.getTestCachedSASTokenInstance());
        return inputStream;
    }

    public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, String fileName, int fileSize, String eTag, int readAheadQueueDepth, int readBufferSize, boolean alwaysReadBufferSize, int readAheadBlockSize) throws IOException {
        AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1L);
        AbfsInputStream inputStream = new AbfsInputStream(abfsClient, null, "/" + fileName, (long)fileSize, inputStreamContext.withReadBufferSize(readBufferSize).withReadAheadQueueDepth(readAheadQueueDepth).withShouldReadBufferSizeAlways(alwaysReadBufferSize).withReadAheadBlockSize(readAheadBlockSize), eTag, this.getTestTracingContext(this.getFileSystem(), false));
        inputStream.setCachedSasToken(TestCachedSASToken.getTestCachedSASTokenInstance());
        return inputStream;
    }

    private void queueReadAheads(AbfsInputStream inputStream) {
        ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0L, 1024, inputStream.getTracingContext());
        ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 1024L, 1024, inputStream.getTracingContext());
        ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 2048L, 2048, inputStream.getTracingContext());
    }

    private void verifyReadCallCount(AbfsClient client, int count) throws IOException, InterruptedException {
        Thread.sleep(1000L);
        ((AbfsClient)Mockito.verify((Object)client, (VerificationMode)Mockito.times((int)count))).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
    }

    private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) throws Exception {
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
        for (int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); numOfCompletedReadListItems > 0; --numOfCompletedReadListItems) {
            ReadBufferManager.getBufferManager().callTryEvict();
        }
        if (expectedToThrowException) {
            LambdaTestUtils.intercept(IOException.class, () -> inputStream.read((long)position, new byte[1024], 0, 1024));
        } else {
            inputStream.read((long)position, new byte[1024], 0, 1024);
        }
    }

    public TestAbfsInputStream() throws Exception {
        ReadBufferManager.getBufferManager();
        ReadBufferManager.setThresholdAgeMilliseconds((int)3000);
    }

    private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
        AzureBlobFileSystem fs = this.getFileSystem();
        fs.create(testFile);
        FSDataOutputStream out = fs.append(testFile);
        out.write(buffer);
        out.close();
    }

    private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, byte[] buf, AbfsRestOperationType source) throws IOException, ExecutionException, InterruptedException {
        byte[] readBuf = new byte[buf.length];
        AzureBlobFileSystem fs = this.getFileSystem();
        FutureDataInputStreamBuilder builder = fs.openFile(path);
        builder.withFileStatus(fileStatus);
        FSDataInputStream in = (FSDataInputStream)builder.build().get();
        TestAbfsInputStream.assertEquals((String)String.format("Open with fileStatus [from %s result]: Incorrect number of bytes read", source), (long)buf.length, (long)in.read(readBuf));
        TestAbfsInputStream.assertArrayEquals((String)String.format("Open with fileStatus [from %s result]: Incorrect read data", source), (byte[])readBuf, (byte[])buf);
    }

    private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, AbfsRestOperationType source, TracingContext tracingContext) throws IOException {
        abfsStore.openFileForRead(testFile, Optional.ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
        ((AbfsClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)0).description(String.format("FileStatus [from %s result] provided, GetFileStatus should not be invoked", source)))).getPathStatus(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (TracingContext)ArgumentMatchers.any(TracingContext.class), (ContextEncryptionAdapter)ArgumentMatchers.any(ContextEncryptionAdapter.class));
        abfsStore.openFileForRead(testFile, Optional.empty(), null, tracingContext);
        ((AbfsClient)Mockito.verify((Object)mockClient, (VerificationMode)Mockito.times((int)1).description("GetPathStatus should be invoked when FileStatus not provided"))).getPathStatus(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (TracingContext)ArgumentMatchers.any(TracingContext.class), (ContextEncryptionAdapter)ArgumentMatchers.nullable(ContextEncryptionAdapter.class));
        Mockito.reset((Object[])new AbfsClient[]{mockClient});
    }

    @Test
    public void testOpenFileWithOptions() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        String testFolder = "/testFolder";
        Path smallTestFile = new Path(testFolder + "/testFile0");
        Path largeTestFile = new Path(testFolder + "/testFile1");
        fs.mkdirs(new Path(testFolder));
        int readBufferSize = this.getConfiguration().getReadBufferSize();
        byte[] smallBuffer = new byte[5];
        byte[] largeBuffer = new byte[readBufferSize + 5];
        new Random().nextBytes(smallBuffer);
        new Random().nextBytes(largeBuffer);
        this.writeBufferToNewFile(smallTestFile, smallBuffer);
        this.writeBufferToNewFile(largeTestFile, largeBuffer);
        FileStatus[] getFileStatusResults = new FileStatus[]{fs.getFileStatus(smallTestFile), fs.getFileStatus(largeTestFile)};
        FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
        this.verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], smallBuffer, AbfsRestOperationType.GetPathStatus);
        this.verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], largeBuffer, AbfsRestOperationType.GetPathStatus);
        this.verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, AbfsRestOperationType.ListPaths);
        this.verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, AbfsRestOperationType.ListPaths);
        AzureBlobFileSystemStore abfsStore = this.getAbfsStore(fs);
        AbfsClient mockClient = (AbfsClient)Mockito.spy((Object)this.getAbfsClient(abfsStore));
        this.setAbfsClient(abfsStore, mockClient);
        TracingContext tracingContext = this.getTestTracingContext(fs, false);
        this.checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
        this.checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
        this.checkGetPathStatusCalls(smallTestFile, listStatusResults[0], abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
        this.checkGetPathStatusCalls(largeTestFile, listStatusResults[1], abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
        getFileStatusResults[0].setPath(new Path("wrongPath"));
        LambdaTestUtils.intercept(ExecutionException.class, () -> this.verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], smallBuffer, AbfsRestOperationType.GetPathStatus));
    }

    @Test
    public void testFailedReadAhead() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error RAH-Thread-Z")}).doReturn((Object)successOp).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testFailedReadAhead.txt");
        LambdaTestUtils.intercept(IOException.class, () -> inputStream.read(new byte[1024]));
        this.verifyReadCallCount(client, 3);
        this.checkEvictedStatus(inputStream, 0, false);
    }

    @Test
    public void testFailedReadAheadEviction() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        ReadBufferManager.setThresholdAgeMilliseconds((int)30000);
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("Internal Server error")}).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testFailedReadAheadEviction.txt");
        ReadBuffer buff = new ReadBuffer();
        buff.setStatus(ReadBufferStatus.READ_FAILED);
        ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
        ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0L, 1024, this.getTestTracingContext(this.getFileSystem(), true));
    }

    @Test
    public void testOlderReadAheadFailure() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).doReturn((Object)successOp).doReturn((Object)successOp).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testOlderReadAheadFailure.txt");
        LambdaTestUtils.intercept(IOException.class, () -> inputStream.read(new byte[1024]));
        this.verifyReadCallCount(client, 3);
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
        inputStream.read(1024L, new byte[1024], 0, 1024);
        this.verifyReadCallCount(client, 4);
        this.checkEvictedStatus(inputStream, 0, false);
    }

    @Test
    public void testSuccessfulReadAhead() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation op = this.getMockRestOp();
        ((AbfsClient)Mockito.doReturn((Object)op).doReturn((Object)op).doReturn((Object)op).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
        int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize();
        inputStream.read(new byte[1024]);
        this.verifyReadCallCount(client, 3);
        int newAdditionsToCompletedRead = ReadBufferManager.getBufferManager().getCompletedReadListSize() - beforeReadCompletedListSize;
        ((AbstractIntegerAssert)Assertions.assertThat((int)newAdditionsToCompletedRead).describedAs("New additions to completed reads should be same or less than as number of readaheads", new Object[0])).isLessThanOrEqualTo(3);
        inputStream.read(1024L, new byte[1024], 0, 1024);
        this.verifyReadCallCount(client, 3);
        this.checkEvictedStatus(inputStream, 0, true);
    }

    @Test
    public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        Long serverCommunicationMockLatency = 3000L;
        Long readBufferTransferToInProgressProbableTime = 1000L;
        Integer readBufferQueuedCount = 3;
        ((AbfsClient)Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(serverCommunicationMockLatency);
            return successOp;
        }).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.nullable(ContextEncryptionAdapter.class), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        ReadBufferManager readBufferManager = ReadBufferManager.getBufferManager();
        int readBufferTotal = readBufferManager.getNumBuffers();
        int expectedFreeListBufferCount = readBufferTotal - readBufferQueuedCount;
        try (AbfsInputStream inputStream = this.getAbfsInputStream(client, "testSuccessfulReadAhead.txt");){
            this.queueReadAheads(inputStream);
            Thread.sleep(readBufferTransferToInProgressProbableTime);
            ((ListAssert)Assertions.assertThat((List)readBufferManager.getInProgressCopiedList()).describedAs(String.format("InProgressList should have %d elements", readBufferQueuedCount), new Object[0])).hasSize(readBufferQueuedCount.intValue());
            ((ListAssert)Assertions.assertThat((List)readBufferManager.getFreeListCopy()).describedAs(String.format("FreeList should have %d elements", expectedFreeListBufferCount), new Object[0])).hasSize(expectedFreeListBufferCount);
            ((ListAssert)Assertions.assertThat((List)readBufferManager.getCompletedReadListCopy()).describedAs("CompletedList should have 0 elements", new Object[0])).hasSize(0);
        }
        ((ListAssert)Assertions.assertThat((List)readBufferManager.getInProgressCopiedList()).describedAs(String.format("InProgressList should have %d elements", readBufferQueuedCount), new Object[0])).hasSize(readBufferQueuedCount.intValue());
        ((ListAssert)Assertions.assertThat((List)readBufferManager.getFreeListCopy()).describedAs(String.format("FreeList should have %d elements", expectedFreeListBufferCount), new Object[0])).hasSize(expectedFreeListBufferCount);
        ((ListAssert)Assertions.assertThat((List)readBufferManager.getCompletedReadListCopy()).describedAs("CompletedList should have 0 elements", new Object[0])).hasSize(0);
    }

    @Test
    public void testReadAheadManagerForFailedReadAhead() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Thread-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error RAH-Thread-Z")}).doReturn((Object)successOp).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt");
        this.queueReadAheads(inputStream);
        Thread.sleep(1000L);
        LambdaTestUtils.intercept(IOException.class, () -> ReadBufferManager.getBufferManager().getBlock(inputStream, 0L, 1024, new byte[1024]));
        this.verifyReadCallCount(client, 3);
        this.checkEvictedStatus(inputStream, 0, false);
    }

    @Test
    public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation successOp = this.getMockRestOp();
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doReturn((Object)successOp).doReturn((Object)successOp).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt");
        this.queueReadAheads(inputStream);
        Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
        this.verifyReadCallCount(client, 3);
        int bytesRead = ReadBufferManager.getBufferManager().getBlock(inputStream, 1024L, 1024, new byte[1024]);
        Assert.assertEquals((String)"bytesRead should be zero when previously read ahead buffer had failed", (long)0L, (long)bytesRead);
        this.checkEvictedStatus(inputStream, 0, false);
    }

    @Test
    public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
        AbfsClient client = this.getMockAbfsClient();
        AbfsRestOperation op = this.getMockRestOp();
        ((AbfsClient)Mockito.doReturn((Object)op).doReturn((Object)op).doReturn((Object)op).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-X")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Y")}).doThrow(new Throwable[]{new TimeoutException("Internal Server error for RAH-Z")}).when((Object)client)).read((String)ArgumentMatchers.any(String.class), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (byte[])ArgumentMatchers.any(byte[].class), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), ((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), (ContextEncryptionAdapter)ArgumentMatchers.any(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        AbfsInputStream inputStream = this.getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
        this.queueReadAheads(inputStream);
        Thread.sleep(1000L);
        this.verifyReadCallCount(client, 3);
        int bytesRead = ReadBufferManager.getBufferManager().getBlock(inputStream, 1024L, 1024, new byte[1024]);
        Assert.assertTrue((String)"bytesRead should be non-zero from the buffer that was read-ahead", (bytesRead > 0 ? 1 : 0) != 0);
        this.verifyReadCallCount(client, 3);
        this.checkEvictedStatus(inputStream, 0, true);
    }

    @Test
    public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception {
        this.resetReadBufferManager(0x400000, 30000);
        this.testReadAheadConfigs(0x400000, 4, false, 0x800000);
        this.resetReadBufferManager(16384, 30000);
        AbfsInputStream inputStream = this.testReadAheadConfigs(16384, 2, true, 16384);
        this.testReadAheads(inputStream, 16384, 16384);
        this.resetReadBufferManager(49152, 30000);
        inputStream = this.testReadAheadConfigs(16384, 2, true, 49152);
        this.testReadAheads(inputStream, 16384, 49152);
        this.resetReadBufferManager(49152, 30000);
        inputStream = this.testReadAheadConfigs(49152, 2, true, 16384);
        this.testReadAheads(inputStream, 49152, 16384);
    }

    @Test
    public void testDefaultReadaheadQueueDepth() throws Exception {
        Configuration config = this.getRawConfiguration();
        config.unset("fs.azure.readaheadqueue.depth");
        AzureBlobFileSystem fs = this.getFileSystem(config);
        Path testFile = this.path("/testFile");
        fs.create(testFile).close();
        FSDataInputStream in = fs.open(testFile);
        ((AbstractIntegerAssert)Assertions.assertThat((int)((AbfsInputStream)in.getWrappedStream()).getReadAheadQueueDepth()).describedAs("readahead queue depth should be set to default value 2", new Object[0])).isEqualTo(2);
        in.close();
    }

    private void testReadAheads(AbfsInputStream inputStream, int readRequestSize, int readAheadRequestSize) throws Exception {
        if (readRequestSize > readAheadRequestSize) {
            readAheadRequestSize = readRequestSize;
        }
        byte[] firstReadBuffer = new byte[readRequestSize];
        byte[] secondReadBuffer = new byte[readAheadRequestSize];
        byte[] expectedFirstReadAheadBufferContents = new byte[readRequestSize];
        byte[] expectedSecondReadAheadBufferContents = new byte[readAheadRequestSize];
        this.getExpectedBufferData(0, readRequestSize, expectedFirstReadAheadBufferContents);
        this.getExpectedBufferData(readRequestSize, readAheadRequestSize, expectedSecondReadAheadBufferContents);
        ((AbstractIntegerAssert)Assertions.assertThat((int)inputStream.read(firstReadBuffer, 0, readRequestSize)).describedAs("Read should be of exact requested size", new Object[0])).isEqualTo(readRequestSize);
        TestAbfsInputStream.assertTrue((String)"Data mismatch found in RAH1", (boolean)Arrays.equals(firstReadBuffer, expectedFirstReadAheadBufferContents));
        ((AbstractIntegerAssert)Assertions.assertThat((int)inputStream.read(secondReadBuffer, 0, readAheadRequestSize)).describedAs("Read should be of exact requested size", new Object[0])).isEqualTo(readAheadRequestSize);
        TestAbfsInputStream.assertTrue((String)"Data mismatch found in RAH2", (boolean)Arrays.equals(secondReadBuffer, expectedSecondReadAheadBufferContents));
    }

    public AbfsInputStream testReadAheadConfigs(int readRequestSize, int readAheadQueueDepth, boolean alwaysReadBufferSizeEnabled, int readAheadBlockSize) throws Exception {
        Configuration config = new Configuration(this.getRawConfiguration());
        config.set("fs.azure.read.request.size", Integer.toString(readRequestSize));
        config.set("fs.azure.readaheadqueue.depth", Integer.toString(readAheadQueueDepth));
        config.set("fs.azure.read.alwaysReadBufferSize", Boolean.toString(alwaysReadBufferSizeEnabled));
        config.set("fs.azure.read.readahead.blocksize", Integer.toString(readAheadBlockSize));
        if (readRequestSize > readAheadBlockSize) {
            readAheadBlockSize = readRequestSize;
        }
        Path testPath = this.path("/testReadAheadConfigs");
        AzureBlobFileSystem fs = this.createTestFile(testPath, 0x1000000L, config);
        byte[] byteBuffer = new byte[0x100000];
        AbfsInputStream inputStream = this.getAbfsStore(fs).openFileForRead(testPath, null, this.getTestTracingContext(fs, false));
        ((AbstractIntegerAssert)Assertions.assertThat((int)inputStream.getBufferSize()).describedAs("Unexpected AbfsInputStream buffer size", new Object[0])).isEqualTo(readRequestSize);
        ((AbstractIntegerAssert)Assertions.assertThat((int)inputStream.getReadAheadQueueDepth()).describedAs("Unexpected ReadAhead queue depth", new Object[0])).isEqualTo(readAheadQueueDepth);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)inputStream.shouldAlwaysReadBufferSize()).describedAs("Unexpected AlwaysReadBufferSize settings", new Object[0])).isEqualTo(alwaysReadBufferSizeEnabled);
        ((AbstractIntegerAssert)Assertions.assertThat((int)ReadBufferManager.getBufferManager().getReadAheadBlockSize()).describedAs("Unexpected readAhead block size", new Object[0])).isEqualTo(readAheadBlockSize);
        return inputStream;
    }

    private void getExpectedBufferData(int offset, int length, byte[] b) {
        boolean startFillingIn = false;
        int indexIntoBuffer = 0;
        int character = 97;
        for (int i = 0; i < offset + length; ++i) {
            if (i == offset) {
                startFillingIn = true;
            }
            if (startFillingIn && indexIntoBuffer < length) {
                b[indexIntoBuffer] = (byte)character;
                ++indexIntoBuffer;
            }
            character = (char)(character == 122 ? 97 : (char)(character + 1));
        }
    }

    private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, Configuration config) throws Exception {
        FileStatus status;
        AzureBlobFileSystem fs;
        if (config == null) {
            fs = this.getFileSystem();
        } else {
            AzureBlobFileSystem currentFs = this.getFileSystem();
            fs = (AzureBlobFileSystem)FileSystem.newInstance((URI)currentFs.getUri(), (Configuration)config);
        }
        if (fs.exists(testFilePath) && (status = fs.getFileStatus(testFilePath)).getLen() >= testFileSize) {
            return fs;
        }
        byte[] buffer = new byte[0x800000];
        int character = 97;
        for (int i = 0; i < buffer.length; ++i) {
            buffer[i] = (byte)character;
            character = (char)(character == 122 ? 97 : (char)(character + 1));
        }
        try (FSDataOutputStream outputStream = fs.create(testFilePath);){
            int bytesWritten = 0;
            while ((long)bytesWritten < testFileSize) {
                outputStream.write(buffer);
                bytesWritten += buffer.length;
            }
        }
        ((AbstractLongAssert)Assertions.assertThat((long)fs.getFileStatus(testFilePath).getLen()).describedAs("File not created of expected size", new Object[0])).isEqualTo(testFileSize);
        return fs;
    }

    private void resetReadBufferManager(int bufferSize, int threshold) {
        ReadBufferManager.getBufferManager().testResetReadBufferManager(bufferSize, threshold);
        System.gc();
    }
}

