/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TestingBlobHelpers;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.TriConsumerWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BlobServerCleanupTest {
    private static final Random RANDOM = new Random();
    @TempDir
    private File temporaryFolder;

    BlobServerCleanupTest() {
    }

    private static byte[] createRandomData() {
        byte[] randomData = new byte[2000000];
        RANDOM.nextBytes(randomData);
        return randomData;
    }

    private static BlobServer createTestInstance(String storageDirectoryPath, long cleanupInterval) throws IOException {
        return BlobServerCleanupTest.createTestInstance(storageDirectoryPath, cleanupInterval, (BlobStore)new VoidBlobStore());
    }

    private static BlobServer createTestInstance(String storageDirectoryPath, long cleanupInterval, BlobStore blobStore) throws IOException {
        Configuration config = new Configuration();
        config.set(BlobServerOptions.STORAGE_DIRECTORY, (Object)storageDirectoryPath);
        config.set(BlobServerOptions.CLEANUP_INTERVAL, (Object)cleanupInterval);
        return new BlobServer(config, new File(storageDirectoryPath), blobStore);
    }

    @Test
    void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(null);
    }

    @Test
    void testTransientBlobForJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(new JobID());
    }

    private void testTransientBlobCleanup(@Nullable JobID jobId) throws IOException, InterruptedException, ExecutionException {
        long cleanupInterval = 1L;
        int numberConcurrentGetOperations = 3;
        ArrayList<CompletableFuture<Void>> getOperations = new ArrayList<CompletableFuture<Void>>(3);
        byte[] data = BlobServerCleanupTest.createRandomData();
        byte[] data2 = BlobServerCleanupTest.createRandomData();
        try (BlobServer server = BlobServerCleanupTest.createTestInstance(this.temporaryFolder.getAbsolutePath(), cleanupInterval);){
            ConcurrentMap transientBlobExpiryTimes = server.getBlobExpiryTimes();
            server.start();
            long cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            TransientBlobKey key1 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.TRANSIENT_BLOB);
            Long key1ExpiryAfterPut = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1));
            Assertions.assertThat((Long)key1ExpiryAfterPut).isGreaterThanOrEqualTo(cleanupLowerBound);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            TransientBlobKey key2 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data2, BlobKey.BlobType.TRANSIENT_BLOB);
            Long key2ExpiryAfterPut = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2));
            Assertions.assertThat((Long)key2ExpiryAfterPut).isGreaterThanOrEqualTo(cleanupLowerBound);
            JobID jobIdHA = jobId == null ? new JobID() : jobId;
            BlobKey keyHA = BlobServerPutTest.put((BlobService)server, jobIdHA, data, BlobKey.BlobType.PERMANENT_BLOB);
            Thread.sleep(1L);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)server, jobId, (BlobKey)key1, data);
            Long key1ExpiryAfterGet = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1));
            Assertions.assertThat((Long)key1ExpiryAfterGet).isGreaterThan((Comparable)key1ExpiryAfterPut);
            Assertions.assertThat((Long)key1ExpiryAfterGet).isGreaterThanOrEqualTo(cleanupLowerBound);
            Assertions.assertThat((Long)key2ExpiryAfterPut).isEqualTo(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)));
            Thread.sleep(1L);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)server, jobId, (BlobKey)key2, data2);
            Assertions.assertThat((Long)key1ExpiryAfterGet).isEqualTo(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1)));
            Assertions.assertThat((Long)((Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)))).isGreaterThan((Comparable)key2ExpiryAfterPut);
            Assertions.assertThat((Long)((Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)))).isGreaterThanOrEqualTo(cleanupLowerBound);
            long finishTime = System.currentTimeMillis() + 3L * cleanupInterval;
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; ++i) {
                CompletableFuture<Void> getOperation = CompletableFuture.supplyAsync(() -> {
                    try {
                        while (System.currentTimeMillis() < finishTime) {
                            BlobServerGetTest.get((BlobService)server, jobId, (BlobKey)key1);
                        }
                        return null;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not retrieve blob.", (Throwable)e));
                    }
                }, executor);
                getOperations.add(getOperation);
            }
            FutureUtils.ConjunctFuture filesFuture = FutureUtils.combineAll(getOperations);
            filesFuture.get();
            BlobCachePutTest.verifyDeletedEventually(server, jobId, new BlobKey[]{key1, key2});
            BlobServerPutTest.verifyContents((BlobService)server, jobIdHA, keyHA, data);
        }
    }

    @Test
    void testLocalCleanup() throws Exception {
        TestingBlobStore blobStore = this.createTestingBlobStoreBuilder().setDeleteAllFunction(jobDataToDelete -> (Boolean)Assertions.fail((String)"No deleteAll call is expected to be triggered but was for %s.", (Object[])new Object[]{jobDataToDelete})).createTestingBlobStore();
        this.testSuccessfulCleanup(new JobID(), (TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception>)((TriConsumerWithException)(testInstance, jobId, executor) -> testInstance.localCleanupAsync(jobId, executor).join()), blobStore);
    }

    @Test
    void testGlobalCleanup() throws Exception {
        HashSet actuallyDeletedJobData = new HashSet();
        JobID jobId = new JobID();
        TestingBlobStore blobStore = this.createTestingBlobStoreBuilder().setDeleteAllFunction(jobDataToDelete -> {
            actuallyDeletedJobData.add(jobDataToDelete);
            return true;
        }).createTestingBlobStore();
        this.testSuccessfulCleanup(jobId, (TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception>)((TriConsumerWithException)(testInstance, jobIdForCleanup, executor) -> testInstance.globalCleanupAsync(jobIdForCleanup, executor).join()), blobStore);
        Assertions.assertThat(actuallyDeletedJobData).containsExactlyInAnyOrder((Object[])new JobID[]{jobId});
    }

    @Test
    void testGlobalCleanupUnsuccessfulInBlobStore() throws Exception {
        TestingBlobStore blobStore = this.createTestingBlobStoreBuilder().setDeleteAllFunction(jobDataToDelete -> false).createTestingBlobStore();
        this.testFailedCleanup(new JobID(), (TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception>)((TriConsumerWithException)(testInstance, jobId, executor) -> ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> testInstance.globalCleanupAsync(new JobID(), executor).get()).isInstanceOf(ExecutionException.class)).hasCauseInstanceOf(IOException.class)), blobStore);
    }

    @Test
    void testGlobalCleanupFailureInBlobStore() throws Exception {
        RuntimeException actualException = new RuntimeException("Expected RuntimeException");
        TestingBlobStore blobStore = this.createTestingBlobStoreBuilder().setDeleteAllFunction(jobDataToDelete -> {
            throw actualException;
        }).createTestingBlobStore();
        this.testFailedCleanup(new JobID(), (TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception>)((TriConsumerWithException)(testInstance, jobId, executor) -> ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> testInstance.globalCleanupAsync(new JobID(), executor).get()).isInstanceOf(ExecutionException.class)).hasCause((Throwable)actualException)), blobStore);
    }

    private TestingBlobStoreBuilder createTestingBlobStoreBuilder() {
        return new TestingBlobStoreBuilder().setDeleteFunction((jobId, blobKey) -> {
            throw new UnsupportedOperationException("Deletion of individual blobs is not supported.");
        });
    }

    private void testFailedCleanup(JobID jobId, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, BlobStore blobStore) throws Exception {
        this.testCleanup(jobId, callback, blobStore, 2);
    }

    private void testSuccessfulCleanup(JobID jobId, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, BlobStore blobStore) throws Exception {
        this.testCleanup(jobId, callback, blobStore, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCleanup(JobID jobId, TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, BlobStore blobStore, int expectedFileCountAfterCleanup) throws Exception {
        JobID otherJobId = new JobID();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try (BlobServer testInstance = BlobServerCleanupTest.createTestInstance(this.temporaryFolder.getAbsolutePath(), Integer.MAX_VALUE, blobStore);){
            testInstance.start();
            BlobKey transientDataBlobKey = BlobServerPutTest.put((BlobService)testInstance, jobId, BlobServerCleanupTest.createRandomData(), BlobKey.BlobType.TRANSIENT_BLOB);
            BlobKey otherTransientDataBlobKey = BlobServerPutTest.put((BlobService)testInstance, otherJobId, BlobServerCleanupTest.createRandomData(), BlobKey.BlobType.TRANSIENT_BLOB);
            BlobKey permanentDataBlobKey = BlobServerPutTest.put((BlobService)testInstance, jobId, BlobServerCleanupTest.createRandomData(), BlobKey.BlobType.PERMANENT_BLOB);
            BlobKey otherPermanentDataBlobKey = BlobServerPutTest.put((BlobService)testInstance, otherJobId, BlobServerCleanupTest.createRandomData(), BlobKey.BlobType.PERMANENT_BLOB);
            TestingBlobHelpers.checkFilesExist(jobId, Arrays.asList(transientDataBlobKey, permanentDataBlobKey), testInstance, true);
            TestingBlobHelpers.checkFilesExist(otherJobId, Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey), testInstance, true);
            callback.accept((Object)testInstance, (Object)jobId, (Object)executorService);
            TestingBlobHelpers.checkFileCountForJob(expectedFileCountAfterCleanup, jobId, (PermanentBlobService)testInstance);
            TestingBlobHelpers.checkFilesExist(otherJobId, Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey), testInstance, true);
        }
        finally {
            Assertions.assertThat(executorService.shutdownNow()).isEmpty();
        }
    }

    @Test
    void testBlobServerExpiresRecoveredTransientJobBlob() throws Exception {
        this.runBlobServerExpiresRecoveredTransientBlob(new JobID());
    }

    @Test
    void testBlobServerExpiresRecoveredTransientNoJobBlob() throws Exception {
        this.runBlobServerExpiresRecoveredTransientBlob(null);
    }

    private void runBlobServerExpiresRecoveredTransientBlob(@Nullable JobID jobId) throws Exception {
        long cleanupInterval = 1L;
        TransientBlobKey transientBlobKey = TestingBlobUtils.writeTransientBlob(this.temporaryFolder.toPath(), jobId, new byte[]{1, 2, 3, 4});
        File blob = BlobUtils.getStorageLocation((File)this.temporaryFolder, (JobID)jobId, (BlobKey)transientBlobKey);
        try (BlobServer blobServer = BlobServerCleanupTest.createTestInstance(this.temporaryFolder.getAbsolutePath(), 1L);){
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> !blob.exists()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBlobServerRetainsJobs() throws Exception {
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        byte[] fileContent = new byte[]{1, 2, 3, 4};
        PermanentBlobKey blobKey1 = TestingBlobUtils.writePermanentBlob(this.temporaryFolder.toPath(), jobId1, fileContent);
        PermanentBlobKey blobKey2 = TestingBlobUtils.writePermanentBlob(this.temporaryFolder.toPath(), jobId2, fileContent);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try (BlobServer blobServer = BlobServerCleanupTest.createTestInstance(this.temporaryFolder.getAbsolutePath(), (Long)BlobServerOptions.CLEANUP_INTERVAL.defaultValue());){
            blobServer.retainJobs(Collections.singleton(jobId1), (Executor)executorService);
            Assertions.assertThat((File)blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
            Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2)).isInstanceOf(NoSuchFileException.class);
        }
        finally {
            Assertions.assertThat(executorService.shutdownNow()).isEmpty();
        }
    }
}

