/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.index;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManagerImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion;
import org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegionHelper;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FileDataIndexSpilledRegionManagerImplTest {
    private Path indexFilePath;

    FileDataIndexSpilledRegionManagerImplTest() {
    }

    @BeforeEach
    void before(@TempDir Path tmpPath) {
        this.indexFilePath = tmpPath.resolve(UUID.randomUUID().toString());
    }

    @Test
    void testFindNonExistentRegion() throws Exception {
        CompletableFuture cachedRegionFuture = new CompletableFuture();
        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = this.createSpilledRegionManager((ignore1, ignore2) -> cachedRegionFuture.complete(null));){
            long regionOffset = spilledRegionManager.findRegion(0, 0, true);
            Assertions.assertThat((long)regionOffset).isEqualTo(-1L);
            Assertions.assertThat(cachedRegionFuture).isNotCompleted();
        }
    }

    @Test
    void testAppendOrOverwriteRegion() throws Exception {
        CompletableFuture cachedRegionFuture = new CompletableFuture();
        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = this.createSpilledRegionManager((ignore1, ignore2) -> cachedRegionFuture.complete(null));){
            TestingFileDataIndexRegion region = HybridShuffleTestUtils.createSingleTestRegion(0, 0L, 1);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)region);
            Assertions.assertThat(cachedRegionFuture).isNotCompleted();
            FileChannel indexFileChannel = FileChannel.open(this.indexFilePath, StandardOpenOption.READ);
            TestingFileDataIndexRegion readRegion = TestingFileDataIndexRegion.readRegionFromFile(indexFileChannel, 0L);
            HybridShuffleTestUtils.assertRegionEquals(readRegion, region);
            TestingFileDataIndexRegion newRegion = HybridShuffleTestUtils.createSingleTestRegion(0, 10L, 1);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)newRegion);
            Assertions.assertThat(cachedRegionFuture).isNotCompleted();
            TestingFileDataIndexRegion readNewRegion = TestingFileDataIndexRegion.readRegionFromFile(indexFileChannel, 0L);
            HybridShuffleTestUtils.assertRegionEquals(readNewRegion, newRegion);
        }
    }

    @Test
    void testWriteMoreThanOneRegionGroup() throws Exception {
        List<TestingFileDataIndexRegion> regions = HybridShuffleTestUtils.createTestRegions(0, 0L, 2, 2);
        int regionGroupSize = regions.stream().mapToInt(TestingFileDataIndexRegion::getSize).sum() + 1;
        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = this.createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) -> {});){
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)regions.get(0));
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)regions.get(1));
            TestingFileDataIndexRegion regionInNewRegionGroup = HybridShuffleTestUtils.createSingleTestRegion(4, 4L, 2);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)regionInNewRegionGroup);
            FileChannel indexFileChannel = FileChannel.open(this.indexFilePath, StandardOpenOption.READ);
            TestingFileDataIndexRegion readRegion = TestingFileDataIndexRegion.readRegionFromFile(indexFileChannel, regionGroupSize);
            HybridShuffleTestUtils.assertRegionEquals(readRegion, regionInNewRegionGroup);
        }
    }

    @Test
    void testWriteBigRegion() throws Exception {
        int regionGroupSize = 4;
        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = this.createSpilledRegionManager(regionGroupSize, (ignore1, ignore2) -> {});){
            List<TestingFileDataIndexRegion> regions = HybridShuffleTestUtils.createTestRegions(0, 0L, 1, 2);
            TestingFileDataIndexRegion region1 = regions.get(0);
            TestingFileDataIndexRegion region2 = regions.get(1);
            Assertions.assertThat((int)region1.getSize()).isGreaterThan(regionGroupSize);
            Assertions.assertThat((int)region2.getSize()).isGreaterThan(regionGroupSize);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)region1);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)region2);
            FileChannel indexFileChannel = FileChannel.open(this.indexFilePath, StandardOpenOption.READ);
            TestingFileDataIndexRegion readRegion1 = TestingFileDataIndexRegion.readRegionFromFile(indexFileChannel, 0L);
            HybridShuffleTestUtils.assertRegionEquals(readRegion1, region1);
            TestingFileDataIndexRegion readRegion2 = TestingFileDataIndexRegion.readRegionFromFile(indexFileChannel, readRegion1.getSize());
            HybridShuffleTestUtils.assertRegionEquals(readRegion2, region2);
        }
    }

    @Test
    void testFindRegionFirstBufferIndexInMultipleRegionGroups() throws Exception {
        int numBuffersPerRegion = 2;
        boolean subpartition = false;
        ArrayList loadedRegions = new ArrayList();
        try (FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> spilledRegionManager = this.createSpilledRegionManager(32, (ignore, region) -> loadedRegions.add(region));){
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)HybridShuffleTestUtils.createSingleTestRegion(0, 0L, 2));
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)HybridShuffleTestUtils.createSingleTestRegion(9, 9L, 2));
            TestingFileDataIndexRegion targetRegion = HybridShuffleTestUtils.createSingleTestRegion(2, 2L, 2);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)targetRegion);
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)HybridShuffleTestUtils.createSingleTestRegion(11, 11L, 2));
            spilledRegionManager.appendOrOverwriteRegion(0, (FileDataIndexRegionHelper.Region)HybridShuffleTestUtils.createSingleTestRegion(7, 7L, 2));
            long regionOffset = spilledRegionManager.findRegion(0, 3, true);
            Assertions.assertThat((long)regionOffset).isNotEqualTo(-1L);
            Assertions.assertThat(loadedRegions).hasSize(2);
            HybridShuffleTestUtils.assertRegionEquals((FileDataIndexRegionHelper.Region)loadedRegions.get(1), targetRegion);
        }
    }

    private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> createSpilledRegionManager(BiConsumer<Integer, TestingFileDataIndexRegion> cacheRegionConsumer) {
        return this.createSpilledRegionManager(256, cacheRegionConsumer);
    }

    private FileDataIndexSpilledRegionManager<TestingFileDataIndexRegion> createSpilledRegionManager(int regionGroupSize, BiConsumer<Integer, TestingFileDataIndexRegion> cacheRegionConsumer) {
        int numSubpartitions = 2;
        return new FileDataIndexSpilledRegionManagerImpl.Factory(regionGroupSize, Long.MAX_VALUE, 16, (FileDataIndexRegionHelper)new TestingFileDataIndexRegionHelper.Builder().setReadRegionFromFileFunction((BiFunctionWithException<FileChannel, Long, TestingFileDataIndexRegion, IOException>)((BiFunctionWithException)TestingFileDataIndexRegion::readRegionFromFile)).setWriteRegionToFileConsumer((BiConsumerWithException<FileChannel, TestingFileDataIndexRegion, IOException>)((BiConsumerWithException)TestingFileDataIndexRegion::writeRegionToFile)).build()).create(numSubpartitions, this.indexFilePath, cacheRegionConsumer);
    }
}

