/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class ITestAzureBlobFileSystemLease
extends AbstractAbfsIntegrationTest {
    private static final int TEST_EXECUTION_TIMEOUT = 30000;
    private static final int LONG_TEST_EXECUTION_TIMEOUT = 90000;
    private static final String TEST_FILE = "testfile";
    private final boolean isHNSEnabled = this.getConfiguration().getBoolean("fs.azure.test.namespace.enabled", false);

    private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
        Configuration conf = this.getRawConfiguration();
        conf.setBoolean(String.format("fs.%s.impl.disable.cache", this.getAbfsScheme()), true);
        conf.set("fs.azure.infinite-lease.directories", infiniteLeaseDirs.toUri().getPath());
        conf.setInt("fs.azure.lease.threads", numLeaseThreads);
        return this.getFileSystem(conf);
    }

    @Test(timeout=30000L)
    public void testNoInfiniteLease() throws IOException {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getFileSystem();
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            Assert.assertFalse((String)"Output stream should not have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        }
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testNoLeaseThreads() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 0);
        fs.mkdirs(testFilePath.getParent());
        LambdaTestUtils.intercept(IOException.class, (String)"Lease desired but no lease threads configured, set fs.azure.lease.threads", () -> {
            FSDataOutputStream out = fs.create(testFilePath);
            if (out != null) {
                out.close();
            }
            return "No failure when lease requested with 0 lease threads";
        });
    }

    @Test(timeout=30000L)
    public void testOneWriter() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        Assert.assertTrue((String)"Output stream should have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        out.close();
        Assert.assertFalse((String)"Output stream should not have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testSubDir() throws Exception {
        Path testFilePath = new Path(new Path(this.path(this.methodName.getMethodName()), "subdir"), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent().getParent(), 1);
        fs.mkdirs(testFilePath.getParent().getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        Assert.assertTrue((String)"Output stream should have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        out.close();
        Assert.assertFalse((String)"Output stream should not have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testTwoCreate() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        this.assumeValidTestConfigPresent(this.getRawConfiguration(), "fs.azure.test.namespace.enabled");
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            LambdaTestUtils.intercept(IOException.class, (String)(this.isHNSEnabled ? "Parallel access to the create path detected. Failing request to honor single writer semantics" : "There is currently a lease on the resource and no lease ID was specified in the request"), () -> {
                FSDataOutputStream out2 = fs.create(testFilePath);
                if (out2 != null) {
                    out2.close();
                }
                return "Expected second create on infinite lease dir to fail";
            });
        }
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
        try (FSDataOutputStream out = fs.create(testFilePath);){
            try (FSDataOutputStream out2 = fs.append(testFilePath);){
                out2.writeInt(2);
                out2.hsync();
            }
            catch (IOException e) {
                if (expectException) {
                    GenericTestUtils.assertExceptionContains((String)"Unable to acquire lease", (Throwable)e);
                }
                throw e;
            }
            out.writeInt(1);
            out.hsync();
        }
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getFileSystem();
        Assume.assumeFalse((String)"Parallel Writes Not Allowed on Append Blobs", (boolean)this.isAppendBlobEnabled());
        fs.mkdirs(testFilePath.getParent());
        this.twoWriters(fs, testFilePath, false);
    }

    @Test(timeout=90000L)
    public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        Assume.assumeFalse((String)"Parallel Writes Not Allowed on Append Blobs", (boolean)this.isAppendBlobEnabled());
        fs.mkdirs(testFilePath.getParent());
        this.twoWriters(fs, testFilePath, true);
    }

    @Test(timeout=30000L)
    public void testLeaseFreedOnClose() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        Assert.assertTrue((String)"Output stream should have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        out.close();
        Assert.assertFalse((String)"Output stream should not have lease after close", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testWriteAfterBreakLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        out.hsync();
        fs.registerListener((Listener)new TracingHeaderValidator(this.getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.BREAK_LEASE, false, 0));
        fs.breakLease(testFilePath);
        fs.registerListener(null);
        LambdaTestUtils.intercept(IOException.class, (String)"A lease ID was specified, but the lease for the resource has expired", () -> {
            out.write(1);
            out.hsync();
            return "Expected exception on write after lease break but got " + out;
        });
        LambdaTestUtils.intercept(IOException.class, (String)"A lease ID was specified, but the lease for the resource has expired", () -> {
            out.close();
            return "Expected exception on close after lease break but got " + out;
        });
        Assert.assertTrue((String)"Output stream lease should be freed", (boolean)((AbfsOutputStream)out.getWrappedStream()).isLeaseFreed());
        try (FSDataOutputStream out2 = fs.append(testFilePath);){
            out2.write(2);
            out2.hsync();
        }
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=90000L)
    public void testLeaseFreedAfterBreak() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        FSDataOutputStream out = fs.create(testFilePath);
        out.write(0);
        fs.breakLease(testFilePath);
        LambdaTestUtils.intercept(IOException.class, (String)"A lease ID was specified, but the lease for the resource has expired", () -> {
            out.close();
            return "Expected exception on close after lease break but got " + out;
        });
        Assert.assertTrue((String)"Output stream lease should be freed", (boolean)((AbfsOutputStream)out.getWrappedStream()).isLeaseFreed());
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testInfiniteLease() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            Assert.assertTrue((String)"Output stream should have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
            out.write(0);
        }
        Assert.assertTrue((boolean)fs.getAbfsStore().areLeasesFreed());
        out = fs.append(testFilePath);
        try {
            Assert.assertTrue((String)"Output stream should have lease", (boolean)((AbfsOutputStream)out.getWrappedStream()).hasLease());
            out.write(1);
        }
        finally {
            if (out != null) {
                out.close();
            }
        }
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
    }

    @Test(timeout=30000L)
    public void testFileSystemClose() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        try (FSDataOutputStream out = fs.create(testFilePath);){
            out.write(0);
            Assert.assertFalse((String)"Store leases should exist", (boolean)fs.getAbfsStore().areLeasesFreed());
        }
        fs.close();
        Assert.assertTrue((String)"Store leases were not freed", (boolean)fs.getAbfsStore().areLeasesFreed());
        Callable<String> exceptionRaisingCallable = () -> {
            FSDataOutputStream out2 = fs.append(testFilePath);
            if (out2 != null) {
                out2.close();
            }
            return "Expected exception on new append after closed FS";
        };
        if (this.getConfiguration().getPreferredHttpOperationType() == HttpOperationType.APACHE_HTTP_CLIENT) {
            LambdaTestUtils.intercept(AbfsDriverException.class, exceptionRaisingCallable);
        } else {
            LambdaTestUtils.intercept(RejectedExecutionException.class, exceptionRaisingCallable);
        }
    }

    @Test(timeout=30000L)
    public void testAcquireRetry() throws Exception {
        Path testFilePath = new Path(this.path(this.methodName.getMethodName()), TEST_FILE);
        AzureBlobFileSystem fs = this.getCustomFileSystem(testFilePath.getParent(), 1);
        fs.mkdirs(testFilePath.getParent());
        fs.createNewFile(testFilePath);
        TracingContext tracingContext = this.getTestTracingContext(fs, true);
        TracingHeaderValidator listener = new TracingHeaderValidator(this.getConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.TEST_OP, true, 0);
        tracingContext.setListener((Listener)listener);
        AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), tracingContext);
        Assert.assertNotNull((String)"Did not successfully lease file", (Object)lease.getLeaseID());
        listener.setOperation(FSOperationType.RELEASE_LEASE);
        lease.free();
        lease.getTracingContext().setListener(null);
        Assert.assertEquals((String)"Unexpected acquire retry count", (long)0L, (long)lease.getAcquireRetryCount());
        AbfsClient mockClient = (AbfsClient)Mockito.spy((Object)fs.getAbfsClient());
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new AbfsLease.LeaseException("failed to acquire 1")}).doThrow(new Throwable[]{new AbfsLease.LeaseException("failed to acquire 2")}).doCallRealMethod().when((Object)mockClient)).acquireLease(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, tracingContext);
        Assert.assertNotNull((String)"Acquire lease should have retried", (Object)lease.getLeaseID());
        lease.free();
        Assert.assertEquals((String)"Unexpected acquire retry count", (long)2L, (long)lease.getAcquireRetryCount());
        ((AbfsClient)Mockito.doThrow((Throwable[])new Throwable[]{new AbfsLease.LeaseException("failed to acquire")}).when((Object)mockClient)).acquireLease(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (TracingContext)ArgumentMatchers.any(TracingContext.class));
        LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, tracingContext));
    }
}

