/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.index;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexSpilledRegionManager;
import org.apache.flink.util.ExceptionUtils;

public class FileDataIndexSpilledRegionManagerImpl<T extends FileDataIndexRegionHelper.Region>
implements FileDataIndexSpilledRegionManager<T> {
    private final List<TreeMap<Integer, RegionGroup>> subpartitionFinishedRegionGroupMetas;
    private FileChannel channel;
    private long nextRegionGroupOffset = 0L;
    private final long[] subpartitionCurrentOffset;
    private final int[] subpartitionFreeSpaceInBytes;
    private final RegionGroup[] currentRegionGroup;
    private final int regionGroupSizeInBytes;
    private final BiConsumer<Integer, T> cacheRegionConsumer;
    private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;
    private final boolean loadEntireRegionGroupToCache;

    public FileDataIndexSpilledRegionManagerImpl(int numSubpartitions, Path indexFilePath, int regionGroupSizeInBytes, long maxCacheCapacity, int regionHeaderSize, BiConsumer<Integer, T> cacheRegionConsumer, FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
        try {
            this.channel = FileChannel.open(indexFilePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        this.loadEntireRegionGroupToCache = FileDataIndexSpilledRegionManagerImpl.shouldLoadEntireRegionGroupToCache(numSubpartitions, regionGroupSizeInBytes, maxCacheCapacity, regionHeaderSize);
        this.subpartitionFinishedRegionGroupMetas = new ArrayList<TreeMap<Integer, RegionGroup>>(numSubpartitions);
        this.subpartitionCurrentOffset = new long[numSubpartitions];
        this.subpartitionFreeSpaceInBytes = new int[numSubpartitions];
        this.currentRegionGroup = new RegionGroup[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            this.subpartitionFinishedRegionGroupMetas.add(new TreeMap());
        }
        this.cacheRegionConsumer = cacheRegionConsumer;
        this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
        this.regionGroupSizeInBytes = regionGroupSizeInBytes;
    }

    @Override
    public long findRegion(int subpartition, int bufferIndex, boolean loadToCache) {
        long regionOffset;
        RegionGroup regionGroup = this.currentRegionGroup[subpartition];
        if (regionGroup != null && (regionOffset = this.findRegionInRegionGroup(subpartition, bufferIndex, regionGroup, loadToCache)) != -1L) {
            return regionOffset;
        }
        TreeMap<Integer, RegionGroup> subpartitionRegionGroupMetaTreeMap = this.subpartitionFinishedRegionGroupMetas.get(subpartition);
        for (RegionGroup meta : subpartitionRegionGroupMetaTreeMap.headMap(bufferIndex, true).values()) {
            long regionOffset2 = this.findRegionInRegionGroup(subpartition, bufferIndex, meta, loadToCache);
            if (regionOffset2 == -1L) continue;
            return regionOffset2;
        }
        return -1L;
    }

    private long findRegionInRegionGroup(int subpartition, int bufferIndex, RegionGroup meta, boolean loadToCache) {
        if (bufferIndex <= meta.getMaxBufferIndex()) {
            try {
                return this.readRegionGroupAndLoadToCacheIfNeeded(subpartition, bufferIndex, meta, loadToCache);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return -1L;
    }

    private long readRegionGroupAndLoadToCacheIfNeeded(int subpartition, int bufferIndex, RegionGroup meta, boolean loadToCache) throws IOException {
        List<Tuple2<Tuple2, Long>> regionAndOffsets = this.readRegionGroup(meta.getOffset(), meta.getNumRegions());
        long targetRegionOffset = -1L;
        FileDataIndexRegionHelper.Region targetRegion = null;
        Iterator<Tuple2<T, Long>> it = regionAndOffsets.iterator();
        while (it.hasNext()) {
            Tuple2<T, Long> regionAndOffset = it.next();
            FileDataIndexRegionHelper.Region region = (FileDataIndexRegionHelper.Region)regionAndOffset.f0;
            if (!region.containBuffer(bufferIndex)) continue;
            targetRegion = region;
            targetRegionOffset = (Long)regionAndOffset.f1;
            it.remove();
        }
        if (targetRegion != null && loadToCache) {
            if (this.loadEntireRegionGroupToCache) {
                regionAndOffsets.forEach(regionAndOffsetTuple -> this.cacheRegionConsumer.accept(subpartition, (Integer)((Object)((FileDataIndexRegionHelper.Region)regionAndOffsetTuple.f0))));
                this.cacheRegionConsumer.accept(subpartition, (Integer)((Object)targetRegion));
            } else {
                this.cacheRegionConsumer.accept(subpartition, (Integer)((Object)targetRegion));
            }
        }
        return targetRegionOffset;
    }

    @Override
    public void appendOrOverwriteRegion(int subpartition, T newRegion) throws IOException {
        long oldRegionOffset = this.findRegion(subpartition, newRegion.getFirstBufferIndex(), false);
        if (oldRegionOffset != -1L) {
            this.writeRegionToOffset(oldRegionOffset, newRegion);
        } else {
            this.appendRegion(subpartition, newRegion);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.channel != null) {
            this.channel.close();
        }
    }

    private static boolean shouldLoadEntireRegionGroupToCache(int numSubpartitions, int regionGroupSizeInBytes, long maxCacheCapacity, int regionHeaderSize) {
        return 2L * (long)numSubpartitions * (long)regionGroupSizeInBytes / (long)regionHeaderSize <= maxCacheCapacity;
    }

    private void appendRegion(int subpartition, T region) throws IOException {
        int regionSize = region.getSize();
        if (this.subpartitionFreeSpaceInBytes[subpartition] < regionSize) {
            this.startNewRegionGroup(subpartition, Math.max(regionSize, this.regionGroupSizeInBytes));
        }
        this.writeRegionToOffset(this.subpartitionCurrentOffset[subpartition], region);
        this.updateRegionGroup(subpartition, region);
    }

    private void writeRegionToOffset(long offset, T region) throws IOException {
        this.channel.position(offset);
        this.fileDataIndexRegionHelper.writeRegionToFile(this.channel, region);
    }

    private void startNewRegionGroup(int subpartition, int newRegionGroupSize) {
        RegionGroup oldRegionGroup = this.currentRegionGroup[subpartition];
        this.currentRegionGroup[subpartition] = new RegionGroup(this.nextRegionGroupOffset);
        this.subpartitionCurrentOffset[subpartition] = this.nextRegionGroupOffset;
        this.nextRegionGroupOffset += (long)newRegionGroupSize;
        this.subpartitionFreeSpaceInBytes[subpartition] = newRegionGroupSize;
        if (oldRegionGroup != null) {
            this.subpartitionFinishedRegionGroupMetas.get(subpartition).put(oldRegionGroup.minBufferIndex, oldRegionGroup);
        }
    }

    private void updateRegionGroup(int subpartition, T region) {
        int regionSize = region.getSize();
        int n = subpartition;
        this.subpartitionFreeSpaceInBytes[n] = this.subpartitionFreeSpaceInBytes[n] - regionSize;
        int n2 = subpartition;
        this.subpartitionCurrentOffset[n2] = this.subpartitionCurrentOffset[n2] + (long)regionSize;
        RegionGroup regionGroup = this.currentRegionGroup[subpartition];
        regionGroup.addRegion(region.getFirstBufferIndex(), region.getFirstBufferIndex() + region.getNumBuffers() - 1);
    }

    private List<Tuple2<T, Long>> readRegionGroup(long offset, int numRegions) throws IOException {
        ArrayList<Tuple2<T, Long>> regionAndOffsets = new ArrayList<Tuple2<T, Long>>();
        for (int i = 0; i < numRegions; ++i) {
            T region = this.fileDataIndexRegionHelper.readRegionFromFile(this.channel, offset);
            regionAndOffsets.add(Tuple2.of(region, (Object)offset));
            offset += (long)region.getSize();
        }
        return regionAndOffsets;
    }

    private static class RegionGroup {
        private int minBufferIndex;
        private int maxBufferIndex;
        private int numRegions;
        private final long offset;

        public RegionGroup(long offset) {
            this.offset = offset;
            this.minBufferIndex = Integer.MAX_VALUE;
            this.maxBufferIndex = 0;
            this.numRegions = 0;
        }

        public int getMaxBufferIndex() {
            return this.maxBufferIndex;
        }

        public long getOffset() {
            return this.offset;
        }

        public int getNumRegions() {
            return this.numRegions;
        }

        public void addRegion(int firstBufferIndexOfRegion, int maxBufferIndexOfRegion) {
            if (firstBufferIndexOfRegion < this.minBufferIndex) {
                this.minBufferIndex = firstBufferIndexOfRegion;
            }
            if (maxBufferIndexOfRegion > this.maxBufferIndex) {
                this.maxBufferIndex = maxBufferIndexOfRegion;
            }
            ++this.numRegions;
        }
    }

    public static class Factory<T extends FileDataIndexRegionHelper.Region>
    implements FileDataIndexSpilledRegionManager.Factory<T> {
        private final int regionGroupSizeInBytes;
        private final long maxCacheCapacity;
        private final int regionHeaderSize;
        private final FileDataIndexRegionHelper<T> fileDataIndexRegionHelper;

        public Factory(int regionGroupSizeInBytes, long maxCacheCapacity, int regionHeaderSize, FileDataIndexRegionHelper<T> fileDataIndexRegionHelper) {
            this.regionGroupSizeInBytes = regionGroupSizeInBytes;
            this.maxCacheCapacity = maxCacheCapacity;
            this.regionHeaderSize = regionHeaderSize;
            this.fileDataIndexRegionHelper = fileDataIndexRegionHelper;
        }

        @Override
        public FileDataIndexSpilledRegionManager<T> create(int numSubpartitions, Path indexFilePath, BiConsumer<Integer, T> cacheRegionConsumer) {
            return new FileDataIndexSpilledRegionManagerImpl<T>(numSubpartitions, indexFilePath, this.regionGroupSizeInBytes, this.maxCacheCapacity, this.regionHeaderSize, cacheRegionConsumer, this.fileDataIndexRegionHelper);
        }
    }
}

