/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsJdkHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType;
import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class ITestAbfsOutputStream
extends AbstractAbfsIntegrationTest {
    private static final int TEST_EXECUTION_TIMEOUT = 120000;
    private static final String TEST_FILE_PATH = "testfile";
    @Parameterized.Parameter
    public HttpOperationType httpOperationType;

    @Parameterized.Parameters(name="{0}")
    public static Iterable<Object[]> params() {
        return Arrays.asList({HttpOperationType.JDK_HTTP_URL_CONNECTION}, {HttpOperationType.APACHE_HTTP_CLIENT});
    }

    @Override
    public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception {
        Configuration conf = new Configuration(configuration);
        conf.set("fs.azure.networking.library", this.httpOperationType.toString());
        return (AzureBlobFileSystem)FileSystem.newInstance((Configuration)conf);
    }

    @Test
    public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
        Configuration conf = this.getRawConfiguration();
        AzureBlobFileSystem fs = this.getFileSystem(conf);
        try (FSDataOutputStream out = fs.create(this.path(TEST_FILE_PATH));){
            AbfsOutputStream stream = (AbfsOutputStream)out.getWrappedStream();
            int maxConcurrentRequests = this.getConfiguration().getWriteMaxConcurrentRequestCount();
            if (stream.isAppendBlobStream().booleanValue()) {
                maxConcurrentRequests = 1;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + maxConcurrentRequests, new Object[0])).isEqualTo(maxConcurrentRequests);
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + this.getConfiguration().getMaxWriteRequestsToQueue(), new Object[0])).isEqualTo(this.getConfiguration().getMaxWriteRequestsToQueue());
        }
    }

    @Test
    public void testMaxRequestsAndQueueCapacity() throws Exception {
        Configuration conf = this.getRawConfiguration();
        int maxConcurrentRequests = 6;
        int maxRequestsToQueue = 10;
        conf.set("fs.azure.write.max.concurrent.requests", "" + maxConcurrentRequests);
        conf.set("fs.azure.write.max.requests.to.queue", "" + maxRequestsToQueue);
        AzureBlobFileSystem fs = this.getFileSystem(conf);
        try (FSDataOutputStream out = fs.create(this.path(TEST_FILE_PATH));){
            AbfsOutputStream stream = (AbfsOutputStream)out.getWrappedStream();
            if (stream.isAppendBlobStream().booleanValue()) {
                maxConcurrentRequests = 1;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxConcurrentRequestCount()).describedAs("maxConcurrentRequests should be " + maxConcurrentRequests, new Object[0])).isEqualTo(maxConcurrentRequests);
            ((AbstractIntegerAssert)Assertions.assertThat((int)stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue, new Object[0])).isEqualTo(maxRequestsToQueue);
        }
    }

    @Test(timeout=120000L)
    public void testAzureBlobFileSystemBackReferenceInOutputStream() throws Exception {
        byte[] testBytes = new byte[5120];
        try (AbfsOutputStream out = this.getStream();){
            for (int i = 0; i < 5; ++i) {
                out.write(testBytes);
                out.flush();
                System.gc();
                ((AbstractBooleanAssert)Assertions.assertThat((out.getExecutorService().isShutdown() || out.getExecutorService().isTerminated() ? 1 : 0) != 0).describedAs("Executor Service should not be closed before OutputStream while writing", new Object[0])).isFalse();
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)out.getFsBackRef().isNull()).describedAs("BackReference in output stream should not be null", new Object[0])).isFalse();
            }
        }
    }

    @Test
    public void testAbfsOutputStreamClosingFsBeforeStream() throws Exception {
        AzureBlobFileSystem fs = new AzureBlobFileSystem();
        fs.initialize(new URI(this.getTestUrl()), new Configuration());
        Path pathFs = this.path(this.getMethodName());
        byte[] inputBytes = new byte[5120];
        try (AbfsOutputStream out = this.createAbfsOutputStreamWithFlushEnabled(fs, pathFs);){
            out.write(inputBytes);
            fs.close();
            LambdaTestUtils.intercept(PathIOException.class, (String)this.getMethodName(), () -> ((AbfsOutputStream)out).close());
        }
    }

    @Test
    public void testExpect100ContinueFailureInAppend() throws Exception {
        Configuration configuration = new Configuration(this.getRawConfiguration());
        configuration.set("fs.azure.account.expect.header.enabled", "true");
        AzureBlobFileSystem fs = this.getFileSystem(configuration);
        Path path = new Path("/testFile");
        AbfsOutputStream os = (AbfsOutputStream)Mockito.spy((Object)((AbfsOutputStream)fs.create(path).getWrappedStream()));
        AbfsClient spiedClient = (AbfsClient)Mockito.spy((Object)os.getClient());
        AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
        this.mockSetupForAppend(httpOpForAppendTest, spiedClient);
        ((AbfsOutputStream)Mockito.doReturn((Object)spiedClient).when((Object)os)).getClient();
        fs.delete(path, true);
        os.write(1);
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> os.close());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)httpOpForAppendTest[0].getConnectionDisconnectedOnError()).describedAs("First try from AbfsClient will have expect-100 header and should fail with expect-100 error.", new Object[0])).isTrue();
        if (httpOpForAppendTest[0] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation)Mockito.verify((Object)((AbfsJdkHttpOperation)httpOpForAppendTest[0]), (VerificationMode)Mockito.times((int)0))).processConnHeadersAndInputStreams((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)httpOpForAppendTest[1].getConnectionDisconnectedOnError()).describedAs("The retried operation from AbfsClient should not fail with expect-100 error. The retried operation does not haveexpect-100 header.", new Object[0])).isFalse();
        if (httpOpForAppendTest[1] instanceof AbfsJdkHttpOperation) {
            ((AbfsJdkHttpOperation)Mockito.verify((Object)((AbfsJdkHttpOperation)httpOpForAppendTest[1]), (VerificationMode)Mockito.times((int)1))).processConnHeadersAndInputStreams((byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
        }
    }

    private void mockSetupForAppend(AbfsHttpOperation[] httpOpForAppendTest, AbfsClient spiedClient) {
        int[] index = new int[]{0};
        ((AbfsClient)Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
            AbfsRestOperation op = (AbfsRestOperation)Mockito.spy((Object)((AbfsRestOperation)abfsRestOpAppendGetInvocation.callRealMethod()));
            ((AbfsRestOperation)Mockito.doAnswer(createHttpOpInvocation -> {
                httpOpForAppendTest[index[0]] = (AbfsHttpOperation)Mockito.spy((Object)((AbfsHttpOperation)createHttpOpInvocation.callRealMethod()));
                int n = index[0];
                index[0] = n + 1;
                return httpOpForAppendTest[n];
            }).when((Object)op)).createHttpOperation();
            return op;
        }).when((Object)spiedClient)).getAbfsRestOperation((AbfsRestOperationType)Mockito.any(AbfsRestOperationType.class), Mockito.anyString(), (URL)Mockito.any(URL.class), Mockito.anyList(), (byte[])Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt(), (String)Mockito.nullable(String.class));
    }

    private AbfsOutputStream getStream() throws URISyntaxException, IOException {
        AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
        fs1.initialize(new URI(this.getTestUrl()), new Configuration());
        Path pathFs1 = this.path(this.getMethodName() + "1");
        return this.createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
    }
}

