/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobHelpers;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class BlobCachePutTest {
    @TempDir
    private java.nio.file.Path tempDir;
    private final Random rnd = new Random();

    BlobCachePutTest() {
    }

    @Test
    void testTransientBlobCacheGetStorageLocationConcurrentNoJob() throws Exception {
        this.testTransientBlobCacheGetStorageLocationConcurrent(null);
    }

    @Test
    void testTransientBlobCacheGetStorageLocationConcurrentForJob() throws Exception {
        this.testTransientBlobCacheGetStorageLocationConcurrent(new JobID());
    }

    private void testTransientBlobCacheGetStorageLocationConcurrent(@Nullable JobID jobId) throws Exception {
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir);
             TransientBlobCache cache = TestingBlobUtils.createTransientCache(this.tempDir, server);){
            server.start();
            TransientBlobKey key = new TransientBlobKey();
            CheckedThread[] threads = new CheckedThread[]{new TransientBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key), new TransientBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key), new TransientBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key)};
            this.checkedThreadSimpleTest(threads);
        }
    }

    @Test
    void testPermanentBlobCacheGetStorageLocationConcurrentForJob() throws Exception {
        JobID jobId = new JobID();
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir);
             PermanentBlobCache cache = TestingBlobUtils.createPermanentCache(this.tempDir, server);){
            server.start();
            PermanentBlobKey key = new PermanentBlobKey();
            CheckedThread[] threads = new CheckedThread[]{new PermanentBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key), new PermanentBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key), new PermanentBlobCacheGetStorageLocation(cache, jobId, (BlobKey)key)};
            this.checkedThreadSimpleTest(threads);
        }
    }

    private void checkedThreadSimpleTest(CheckedThread[] threads) throws Exception {
        for (CheckedThread t : threads) {
            t.start();
        }
        for (CheckedThread t : threads) {
            t.sync();
        }
    }

    @Test
    void testPutBufferTransientSuccessfulGet1() throws IOException, InterruptedException {
        this.testPutBufferSuccessfulGet(null, null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferTransientSuccessfulGet2() throws IOException, InterruptedException {
        this.testPutBufferSuccessfulGet(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferTransientSuccessfulGet3() throws IOException, InterruptedException {
        this.testPutBufferSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferTransientSuccessfulGet4() throws IOException, InterruptedException {
        this.testPutBufferSuccessfulGet(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferPermanentSuccessfulGet() throws IOException, InterruptedException {
        this.testPutBufferSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2, BlobKey.BlobType blobType) throws IOException, InterruptedException {
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            byte[] data2 = Arrays.copyOfRange(data, 10, 54);
            BlobKey key1a = BlobServerPutTest.put((BlobService)cache, jobId1, data, blobType);
            Assertions.assertThat((Comparable)key1a).isNotNull();
            BlobKeyTest.verifyType(blobType, key1a);
            BlobKey key1a2 = BlobServerPutTest.put((BlobService)cache, jobId1, data, blobType);
            Assertions.assertThat((Comparable)key1a2).isNotNull();
            BlobKeyTest.verifyType(blobType, key1a2);
            BlobKeyTest.verifyKeyDifferentHashEquals(key1a, key1a2);
            BlobKey key1b = BlobServerPutTest.put((BlobService)cache, jobId1, data2, blobType);
            Assertions.assertThat((Comparable)key1b).isNotNull();
            BlobKeyTest.verifyType(blobType, key1b);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1b, data2);
            BlobKey key2a = BlobServerPutTest.put((BlobService)cache, jobId2, data, blobType);
            Assertions.assertThat((Comparable)key2a).isNotNull();
            BlobKeyTest.verifyType(blobType, key2a);
            BlobKeyTest.verifyKeyDifferentHashEquals(key1a, key2a);
            BlobKey key2b = BlobServerPutTest.put((BlobService)cache, jobId2, data2, blobType);
            Assertions.assertThat((Comparable)key2b).isNotNull();
            BlobKeyTest.verifyType(blobType, key2b);
            BlobKeyTest.verifyKeyDifferentHashEquals(key1b, key2b);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, key2a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, key2b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, key1a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, key2a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, key2b, data2);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(server, jobId1, key1a);
                BlobCachePutTest.verifyDeletedEventually(server, jobId1, key1b);
                BlobCachePutTest.verifyDeletedEventually(server, jobId2, key2a);
                BlobCachePutTest.verifyDeletedEventually(server, jobId2, key2b);
                BlobServerPutTest.verifyContents((BlobService)cache, jobId1, key1a, data);
                BlobServerPutTest.verifyContents((BlobService)cache, jobId1, key1b, data2);
                BlobServerPutTest.verifyContents((BlobService)cache, jobId2, key2a, data);
                BlobServerPutTest.verifyContents((BlobService)cache, jobId2, key2b, data2);
            } else {
                BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a, data);
                BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1b, data2);
                BlobServerPutTest.verifyContents((BlobService)server, jobId2, key2a, data);
                BlobServerPutTest.verifyContents((BlobService)server, jobId2, key2b, data2);
            }
        }
    }

    @Test
    void testPutStreamTransientSuccessfulGet1() throws IOException, InterruptedException {
        this.testPutStreamTransientSuccessfulGet(null, null);
    }

    @Test
    void testPutStreamTransientSuccessfulGet2() throws IOException, InterruptedException {
        this.testPutStreamTransientSuccessfulGet(null, new JobID());
    }

    @Test
    void testPutStreamTransientSuccessfulGet3() throws IOException, InterruptedException {
        this.testPutStreamTransientSuccessfulGet(new JobID(), new JobID());
    }

    @Test
    void testPutStreamTransientSuccessfulGet4() throws IOException, InterruptedException {
        this.testPutStreamTransientSuccessfulGet(new JobID(), null);
    }

    private void testPutStreamTransientSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException, InterruptedException {
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            byte[] data2 = Arrays.copyOfRange(data, 10, 54);
            TransientBlobKey key1a = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId1, new ByteArrayInputStream(data), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1a).isNotNull();
            BlobKey key1a2 = BlobServerPutTest.put((BlobService)cache, jobId1, new ByteArrayInputStream(data), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1a2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1a, key1a2);
            TransientBlobKey key1b = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId1, new ByteArrayInputStream(data2), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1b).isNotNull();
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1b, data2);
            TransientBlobKey key2a = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId2, new ByteArrayInputStream(data), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key2a).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1a, (BlobKey)key2a);
            TransientBlobKey key2b = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId2, new ByteArrayInputStream(data2), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key2b).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1b, (BlobKey)key2b);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, (BlobKey)key2b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2b, data2);
            BlobCachePutTest.verifyDeletedEventually(server, jobId1, new BlobKey[]{key1a});
            BlobCachePutTest.verifyDeletedEventually(server, jobId1, new BlobKey[]{key1b});
            BlobCachePutTest.verifyDeletedEventually(server, jobId2, new BlobKey[]{key2a});
            BlobCachePutTest.verifyDeletedEventually(server, jobId2, new BlobKey[]{key2b});
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2b, data2);
        }
    }

    @Test
    void testPutChunkedStreamTransientSuccessfulGet1() throws IOException, InterruptedException {
        this.testPutChunkedStreamTransientSuccessfulGet(null, null);
    }

    @Test
    void testPutChunkedStreamTransientSuccessfulGet2() throws IOException, InterruptedException {
        this.testPutChunkedStreamTransientSuccessfulGet(null, new JobID());
    }

    @Test
    void testPutChunkedStreamTransientSuccessfulGet3() throws IOException, InterruptedException {
        this.testPutChunkedStreamTransientSuccessfulGet(new JobID(), new JobID());
    }

    @Test
    void testPutChunkedStreamTransientSuccessfulGet4() throws IOException, InterruptedException {
        this.testPutChunkedStreamTransientSuccessfulGet(new JobID(), null);
    }

    private void testPutChunkedStreamTransientSuccessfulGet(@Nullable JobID jobId1, @Nullable JobID jobId2) throws IOException, InterruptedException {
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            byte[] data2 = Arrays.copyOfRange(data, 10, 54);
            TransientBlobKey key1a = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId1, new BlobServerPutTest.ChunkedInputStream(data, 19), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1a).isNotNull();
            BlobKey key1a2 = BlobServerPutTest.put((BlobService)cache, jobId1, new BlobServerPutTest.ChunkedInputStream(data, 19), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1a2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1a, key1a2);
            TransientBlobKey key1b = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId1, new BlobServerPutTest.ChunkedInputStream(data2, 19), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key1b).isNotNull();
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1b, data2);
            TransientBlobKey key2a = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId2, new BlobServerPutTest.ChunkedInputStream(data, 19), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key2a).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1a, (BlobKey)key2a);
            TransientBlobKey key2b = (TransientBlobKey)BlobServerPutTest.put((BlobService)cache, jobId2, new BlobServerPutTest.ChunkedInputStream(data2, 19), BlobKey.BlobType.TRANSIENT_BLOB);
            Assertions.assertThat((Comparable)key2b).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals((BlobKey)key1b, (BlobKey)key2b);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, key1a2, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)server, jobId2, (BlobKey)key2b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2b, data2);
            BlobCachePutTest.verifyDeletedEventually(server, jobId1, new BlobKey[]{key1a});
            BlobCachePutTest.verifyDeletedEventually(server, jobId1, new BlobKey[]{key1b});
            BlobCachePutTest.verifyDeletedEventually(server, jobId2, new BlobKey[]{key2a});
            BlobCachePutTest.verifyDeletedEventually(server, jobId2, new BlobKey[]{key2b});
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId1, (BlobKey)key1b, data2);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2a, data);
            BlobServerPutTest.verifyContents((BlobService)cache, jobId2, (BlobKey)key2b, data2);
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsNoJob() throws IOException {
        this.testPutBufferFails(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsForJob() throws IOException {
        this.testPutBufferFails(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsForJobHa() throws IOException {
        this.testPutBufferFails(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferFails(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            File tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
            Assertions.assertThat((boolean)tempFileDir.setExecutable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setReadable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setWritable(false, false)).isTrue();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> BlobServerPutTest.put((BlobService)cache, jobId, data, blobType)).isInstanceOf(IOException.class)).hasMessageStartingWith("PUT operation failed: ");
            Assertions.assertThat((boolean)tempFileDir.setWritable(true, false)).isTrue();
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingNoJob() throws IOException {
        this.testPutBufferFailsIncoming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingForJob() throws IOException {
        this.testPutBufferFailsIncoming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingForJobHa() throws IOException {
        this.testPutBufferFailsIncoming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPutBufferFailsIncoming(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File tempFileDir = null;
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            File storageDir;
            server.start();
            tempFileDir = server.createTemporaryFilename().getParentFile();
            Assertions.assertThat((boolean)tempFileDir.setExecutable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setReadable(true, false)).isTrue();
            Assertions.assertThat((boolean)tempFileDir.setWritable(false, false)).isTrue();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            try {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> BlobServerPutTest.put((BlobService)cache, jobId, data, blobType)).isInstanceOf(IOException.class)).hasMessageStartingWith("PUT operation failed: ");
                storageDir = tempFileDir.getParentFile();
            }
            catch (Throwable throwable) {
                File storageDir2 = tempFileDir.getParentFile();
                Assertions.assertThat((Object[])storageDir2.list()).containsExactly((Object[])new String[]{"incoming"});
                throw throwable;
            }
            Assertions.assertThat((Object[])storageDir.list()).containsExactly((Object[])new String[]{"incoming"});
        }
        finally {
            if (tempFileDir != null) {
                tempFileDir.setWritable(true, false);
            }
        }
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreNoJob() throws IOException {
        this.testPutBufferFailsStore(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreForJob() throws IOException {
        this.testPutBufferFailsStore(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag(value="org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreForJobHa() throws IOException {
        this.testPutBufferFailsStore(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPutBufferFailsStore(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File jobStoreDir = null;
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            jobStoreDir = server.getStorageLocation(jobId, BlobKey.createKey((BlobKey.BlobType)blobType)).getParentFile();
            Assertions.assertThat((boolean)jobStoreDir.setExecutable(true, false)).isTrue();
            Assertions.assertThat((boolean)jobStoreDir.setReadable(true, false)).isTrue();
            Assertions.assertThat((boolean)jobStoreDir.setWritable(false, false)).isTrue();
            byte[] data = new byte[2000000];
            this.rnd.nextBytes(data);
            try {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> BlobServerPutTest.put((BlobService)cache, jobId, data, blobType)).isInstanceOf(IOException.class)).hasMessageStartingWith("PUT operation failed: ");
            }
            finally {
                File incomingFileDir = new File(jobStoreDir.getParent(), "incoming");
                Assertions.assertThat((Object[])incomingFileDir.list()).isEmpty();
                Assertions.assertThat((Object[])jobStoreDir.list()).isEmpty();
            }
        }
        finally {
            if (jobStoreDir != null) {
                jobStoreDir.setWritable(true, false);
            }
        }
    }

    @Test
    void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentPutOperations(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentPutOperations(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
        this.testConcurrentPutOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentPutOperations(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException, InterruptedException, ExecutionException {
        List<Path> jars;
        Configuration config = new Configuration();
        BlobStore blobStoreServer = (BlobStore)Mockito.mock(BlobStore.class);
        BlobStore blobStoreCache = (BlobStore)Mockito.mock(BlobStore.class);
        int concurrentPutOperations = 2;
        int dataSize = 1024;
        CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
        byte[] data = new byte[dataSize];
        if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
            File tmpFile = new File(this.tempDir.toFile(), "test_file");
            FileUtils.writeByteArrayToFile((File)tmpFile, (byte[])data);
            jars = Collections.singletonList(new Path(tmpFile.getAbsolutePath()));
        } else {
            jars = null;
        }
        ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList<CompletableFuture<BlobKey>>(concurrentPutOperations);
        ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
        Tuple2<BlobServer, BlobCacheService> serverAndCache = TestingBlobUtils.createServerAndCache(this.tempDir, blobStoreServer, blobStoreCache);
        try (BlobServer server = (BlobServer)serverAndCache.f0;
             BlobCacheService cache = (BlobCacheService)serverAndCache.f1;){
            server.start();
            InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
            for (int i = 0; i < concurrentPutOperations; ++i) {
                Supplier<BlobKey> callable = blobType == BlobKey.BlobType.PERMANENT_BLOB ? () -> {
                    try {
                        List keys = BlobClient.uploadFiles((InetSocketAddress)serverAddress, (Configuration)config, (JobID)jobId, (List)jars);
                        Assertions.assertThat((List)keys).hasSize(1);
                        BlobKey uploadedKey = (BlobKey)keys.get(0);
                        BlobServerPutTest.verifyContents((BlobService)server, jobId, uploadedKey, data);
                        return uploadedKey;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not upload blob.", (Throwable)e));
                    }
                } : () -> {
                    try {
                        BlobServerPutTest.BlockingInputStream inputStream = new BlobServerPutTest.BlockingInputStream(countDownLatch, data);
                        BlobKey uploadedKey = BlobServerPutTest.put((BlobService)cache, jobId, inputStream, blobType);
                        BlobServerPutTest.verifyContents((BlobService)server, jobId, uploadedKey, data);
                        return uploadedKey;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not upload blob.", (Throwable)e));
                    }
                };
                CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(callable, executor);
                allFutures.add(putFuture);
            }
            FutureUtils.ConjunctFuture conjunctFuture = FutureUtils.combineAll(allFutures);
            Collection blobKeys = (Collection)conjunctFuture.get();
            Iterator blobKeyIterator = blobKeys.iterator();
            Assertions.assertThat(blobKeyIterator).hasNext();
            BlobKey blobKey = (BlobKey)blobKeyIterator.next();
            while (blobKeyIterator.hasNext()) {
                BlobKeyTest.verifyKeyDifferentHashEquals(blobKey, (BlobKey)blobKeyIterator.next());
            }
            BlobServerPutTest.verifyContents((BlobService)server, jobId, blobKey, data);
            if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                ((BlobStore)Mockito.verify((Object)blobStoreServer, (VerificationMode)Mockito.times((int)1))).put((File)ArgumentMatchers.any(File.class), (JobID)ArgumentMatchers.eq((Object)jobId), (BlobKey)ArgumentMatchers.eq((Object)blobKey));
            } else {
                ((BlobStore)Mockito.verify((Object)blobStoreServer, (VerificationMode)Mockito.times((int)0))).put((File)ArgumentMatchers.any(File.class), (JobID)ArgumentMatchers.eq((Object)jobId), (BlobKey)ArgumentMatchers.eq((Object)blobKey));
            }
            ((BlobStore)Mockito.verify((Object)blobStoreCache, (VerificationMode)Mockito.times((int)0))).put((File)ArgumentMatchers.any(File.class), (JobID)ArgumentMatchers.eq((Object)jobId), (BlobKey)ArgumentMatchers.eq((Object)blobKey));
        }
        finally {
            executor.shutdownNow();
        }
    }

    static void verifyDeletedEventually(BlobServer server, @Nullable JobID jobId, BlobKey ... keys) throws IOException, InterruptedException {
        long deadline = System.currentTimeMillis() + 30000L;
        do {
            Thread.sleep(10L);
        } while (TestingBlobHelpers.checkFilesExist(jobId, Arrays.asList(keys), server, false) != 0 && System.currentTimeMillis() < deadline);
        for (BlobKey key : keys) {
            BlobServerGetTest.verifyDeleted((BlobService)server, jobId, key);
        }
    }

    private static class TransientBlobCacheGetStorageLocation
    extends CheckedThread {
        private final TransientBlobCache cache;
        private final JobID jobId;
        private final BlobKey key;

        TransientBlobCacheGetStorageLocation(TransientBlobCache cache, @Nullable JobID jobId, BlobKey key) {
            this.cache = cache;
            this.jobId = jobId;
            this.key = key;
        }

        public void go() throws Exception {
            this.cache.getStorageLocation(this.jobId, this.key);
        }
    }

    private static class PermanentBlobCacheGetStorageLocation
    extends CheckedThread {
        private final PermanentBlobCache cache;
        private final JobID jobId;
        private final BlobKey key;

        PermanentBlobCacheGetStorageLocation(PermanentBlobCache cache, JobID jobId, BlobKey key) {
            this.cache = cache;
            this.jobId = jobId;
            this.key = key;
        }

        public void go() throws Exception {
            this.cache.getStorageLocation(this.jobId, this.key);
        }
    }
}

