package org.apache.spark.shuffle.sort;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterators;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.spark.HashPartitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.LZ4CompressionCodec;
import org.apache.spark.io.LZFCompressionCodec;
import org.apache.spark.io.SnappyCompressionCodec;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.security.CryptoStreamUtils;
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.util.Utils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.AbstractIterator;

/* loaded from: input_file:org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.class */
public class UnsafeShuffleWriterSuite {
    static final int NUM_PARTITITONS = 4;
    TestMemoryManager memoryManager;
    TaskMemoryManager taskMemoryManager;
    File mergedOutputFile;
    File tempDir;
    long[] partitionSizesInMergedFile;
    SparkConf conf;
    TaskMetrics taskMetrics;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    BlockManager blockManager;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    IndexShuffleBlockResolver shuffleBlockResolver;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    DiskBlockManager diskBlockManager;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    TaskContext taskContext;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    ShuffleDependency<Object, Object, Object> shuffleDep;
    final HashPartitioner hashPartitioner = new HashPartitioner(NUM_PARTITITONS);
    final LinkedList<File> spillFilesCreated = new LinkedList<>();
    final Serializer serializer = new KryoSerializer(new SparkConf());

    /* loaded from: input_file:org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite$PandaException.class */
    static class PandaException extends RuntimeException {
        PandaException() {
        }
    }

    @After
    public void tearDown() {
        Utils.deleteRecursively(this.tempDir);
        long cleanUpAllAllocatedMemory = this.taskMemoryManager.cleanUpAllAllocatedMemory();
        if (cleanUpAllAllocatedMemory != 0) {
            Assert.fail("Test leaked " + cleanUpAllAllocatedMemory + " bytes of managed memory");
        }
    }

    @Before
    public void setUp() throws IOException {
        MockitoAnnotations.initMocks(this);
        this.tempDir = Utils.createTempDir("test", "test");
        this.mergedOutputFile = File.createTempFile("mergedoutput", "", this.tempDir);
        this.partitionSizesInMergedFile = null;
        this.spillFilesCreated.clear();
        this.conf = new SparkConf().set("spark.buffer.pageSize", "1m").set("spark.memory.offHeap.enabled", "false");
        this.taskMetrics = new TaskMetrics();
        this.memoryManager = new TestMemoryManager(this.conf);
        this.taskMemoryManager = new TaskMemoryManager(this.memoryManager, 0L);
        Mockito.when(this.blockManager.serializerManager()).thenReturn(new SerializerManager(this.serializer, this.conf));
        Mockito.when(this.blockManager.diskBlockManager()).thenReturn(this.diskBlockManager);
        Mockito.when(this.blockManager.getDiskWriter((BlockId) Mockito.any(BlockId.class), (File) Mockito.any(File.class), (SerializerInstance) Mockito.any(SerializerInstance.class), Mockito.anyInt(), (ShuffleWriteMetrics) Mockito.any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            return new DiskBlockObjectWriter((File) arguments[1], this.blockManager.serializerManager(), (SerializerInstance) arguments[2], ((Integer) arguments[3]).intValue(), false, (ShuffleWriteMetrics) arguments[NUM_PARTITITONS], (BlockId) arguments[0]);
        });
        Mockito.when(this.shuffleBlockResolver.getDataFile(Mockito.anyInt(), Mockito.anyInt())).thenReturn(this.mergedOutputFile);
        ((IndexShuffleBlockResolver) Mockito.doAnswer(invocationOnMock2 -> {
            this.partitionSizesInMergedFile = (long[]) invocationOnMock2.getArguments()[2];
            File file = (File) invocationOnMock2.getArguments()[3];
            this.mergedOutputFile.delete();
            file.renameTo(this.mergedOutputFile);
            return null;
        }).when(this.shuffleBlockResolver)).writeIndexFileAndCommit(Mockito.anyInt(), Mockito.anyInt(), (long[]) Mockito.any(long[].class), (File) Mockito.any(File.class));
        Mockito.when(this.diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock3 -> {
            TempShuffleBlockId tempShuffleBlockId = new TempShuffleBlockId(UUID.randomUUID());
            File createTempFile = File.createTempFile("spillFile", ".spill", this.tempDir);
            this.spillFilesCreated.add(createTempFile);
            return Tuple2$.MODULE$.apply(tempShuffleBlockId, createTempFile);
        });
        Mockito.when(this.taskContext.taskMetrics()).thenReturn(this.taskMetrics);
        Mockito.when(this.shuffleDep.serializer()).thenReturn(this.serializer);
        Mockito.when(this.shuffleDep.partitioner()).thenReturn(this.hashPartitioner);
    }

    private UnsafeShuffleWriter<Object, Object> createWriter(boolean z) throws IOException {
        this.conf.set("spark.file.transferTo", String.valueOf(z));
        return new UnsafeShuffleWriter<>(this.blockManager, this.shuffleBlockResolver, this.taskMemoryManager, new SerializedShuffleHandle(0, 1, this.shuffleDep), 0, this.taskContext, this.conf);
    }

    private void assertSpillFilesWereCleanedUp() {
        Iterator<File> it = this.spillFilesCreated.iterator();
        while (it.hasNext()) {
            File next = it.next();
            Assert.assertFalse("Spill file " + next.getPath() + " was not cleaned up", next.exists());
        }
    }

    private List<Tuple2<Object, Object>> readRecordsFromFile() throws IOException {
        ArrayList arrayList = new ArrayList();
        long j = 0;
        for (int i = 0; i < NUM_PARTITITONS; i++) {
            long j2 = this.partitionSizesInMergedFile[i];
            if (j2 > 0) {
                FileInputStream fileInputStream = new FileInputStream(this.mergedOutputFile);
                fileInputStream.getChannel().position(j);
                InputStream wrapForEncryption = this.blockManager.serializerManager().wrapForEncryption(new LimitedInputStream(fileInputStream, j2));
                if (this.conf.getBoolean("spark.shuffle.compress", true)) {
                    wrapForEncryption = CompressionCodec$.MODULE$.createCodec(this.conf).compressedInputStream(wrapForEncryption);
                }
                DeserializationStream deserializeStream = this.serializer.newInstance().deserializeStream(wrapForEncryption);
                scala.collection.Iterator asKeyValueIterator = deserializeStream.asKeyValueIterator();
                while (asKeyValueIterator.hasNext()) {
                    Tuple2 tuple2 = (Tuple2) asKeyValueIterator.next();
                    Assert.assertEquals(i, this.hashPartitioner.getPartition(tuple2._1()));
                    arrayList.add(tuple2);
                }
                deserializeStream.close();
                j += j2;
            }
        }
        return arrayList;
    }

    @Test(expected = IllegalStateException.class)
    public void mustCallWriteBeforeSuccessfulStop() throws IOException {
        createWriter(false).stop(true);
    }

    @Test
    public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
        createWriter(false).stop(false);
    }

    @Test(expected = PandaException.class)
    public void writeFailurePropagates() throws Exception {
        createWriter(true).write(new AbstractIterator<Product2<Object, Object>>() { // from class: org.apache.spark.shuffle.sort.UnsafeShuffleWriterSuite.1BadRecords
            public boolean hasNext() {
                throw new PandaException();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public Product2<Object, Object> m2022next() {
                return null;
            }
        });
    }

    @Test
    public void writeEmptyIterator() throws Exception {
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(true);
        createWriter.write(Iterators.emptyIterator());
        Assert.assertTrue(createWriter.stop(true).isDefined());
        Assert.assertTrue(this.mergedOutputFile.exists());
        Assert.assertArrayEquals(new long[NUM_PARTITITONS], this.partitionSizesInMergedFile);
        Assert.assertEquals(0L, this.taskMetrics.shuffleWriteMetrics().recordsWritten());
        Assert.assertEquals(0L, this.taskMetrics.shuffleWriteMetrics().bytesWritten());
        Assert.assertEquals(0L, this.taskMetrics.diskBytesSpilled());
        Assert.assertEquals(0L, this.taskMetrics.memoryBytesSpilled());
    }

    @Test
    public void writeWithoutSpilling() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_PARTITITONS; i++) {
            arrayList.add(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
        }
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(true);
        createWriter.write(arrayList.iterator());
        Assert.assertTrue(createWriter.stop(true).isDefined());
        Assert.assertTrue(this.mergedOutputFile.exists());
        long j = 0;
        for (long j2 : this.partitionSizesInMergedFile) {
            Assert.assertEquals(this.partitionSizesInMergedFile[0], j2);
            j += j2;
        }
        Assert.assertEquals(this.mergedOutputFile.length(), j);
        Assert.assertEquals(HashMultiset.create(arrayList), HashMultiset.create(readRecordsFromFile()));
        assertSpillFilesWereCleanedUp();
        ShuffleWriteMetrics shuffleWriteMetrics = this.taskMetrics.shuffleWriteMetrics();
        Assert.assertEquals(arrayList.size(), shuffleWriteMetrics.recordsWritten());
        Assert.assertEquals(0L, this.taskMetrics.diskBytesSpilled());
        Assert.assertEquals(0L, this.taskMetrics.memoryBytesSpilled());
        Assert.assertEquals(this.mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
    }

    private void testMergingSpills(boolean z, String str, boolean z2) throws Exception {
        if (str != null) {
            this.conf.set("spark.shuffle.compress", "true");
            this.conf.set("spark.io.compression.codec", str);
        } else {
            this.conf.set("spark.shuffle.compress", "false");
        }
        this.conf.set(package$.MODULE$.IO_ENCRYPTION_ENABLED(), Boolean.valueOf(z2));
        Mockito.when(this.blockManager.serializerManager()).thenReturn(z2 ? new SerializerManager(this.serializer, this.conf, Option.apply(CryptoStreamUtils.createKey(this.conf))) : new SerializerManager(this.serializer, this.conf));
        testMergingSpills(z, z2);
    }

    private void testMergingSpills(boolean z, boolean z2) throws IOException {
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(z);
        ArrayList arrayList = new ArrayList();
        for (int i : new int[]{1, 2, 3, NUM_PARTITITONS, NUM_PARTITITONS, 2}) {
            arrayList.add(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
        }
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(0));
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(1));
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(2));
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(3));
        createWriter.forceSorterToSpill();
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(NUM_PARTITITONS));
        createWriter.insertRecordIntoSorter((Product2) arrayList.get(5));
        createWriter.closeAndWriteOutput();
        Assert.assertTrue(createWriter.stop(true).isDefined());
        Assert.assertTrue(this.mergedOutputFile.exists());
        Assert.assertEquals(2L, this.spillFilesCreated.size());
        long j = 0;
        for (long j2 : this.partitionSizesInMergedFile) {
            j += j2;
        }
        Assert.assertEquals(j, this.mergedOutputFile.length());
        Assert.assertEquals(HashMultiset.create(arrayList), HashMultiset.create(readRecordsFromFile()));
        assertSpillFilesWereCleanedUp();
        ShuffleWriteMetrics shuffleWriteMetrics = this.taskMetrics.shuffleWriteMetrics();
        Assert.assertEquals(arrayList.size(), shuffleWriteMetrics.recordsWritten());
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.lessThan(Long.valueOf(this.mergedOutputFile.length())));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.memoryBytesSpilled()), Matchers.greaterThan(0L));
        Assert.assertEquals(this.mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
    }

    @Test
    public void mergeSpillsWithTransferToAndLZF() throws Exception {
        testMergingSpills(true, LZFCompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithFileStreamAndLZF() throws Exception {
        testMergingSpills(false, LZFCompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithTransferToAndLZ4() throws Exception {
        testMergingSpills(true, LZ4CompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithFileStreamAndLZ4() throws Exception {
        testMergingSpills(false, LZ4CompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithTransferToAndSnappy() throws Exception {
        testMergingSpills(true, SnappyCompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithFileStreamAndSnappy() throws Exception {
        testMergingSpills(false, SnappyCompressionCodec.class.getName(), false);
    }

    @Test
    public void mergeSpillsWithTransferToAndNoCompression() throws Exception {
        testMergingSpills(true, null, false);
    }

    @Test
    public void mergeSpillsWithFileStreamAndNoCompression() throws Exception {
        testMergingSpills(false, null, false);
    }

    @Test
    public void mergeSpillsWithCompressionAndEncryption() throws Exception {
        testMergingSpills(true, LZ4CompressionCodec.class.getName(), true);
    }

    @Test
    public void mergeSpillsWithFileStreamAndCompressionAndEncryption() throws Exception {
        testMergingSpills(false, LZ4CompressionCodec.class.getName(), true);
    }

    @Test
    public void mergeSpillsWithCompressionAndEncryptionSlowPath() throws Exception {
        this.conf.set("spark.shuffle.unsafe.fastMergeEnabled", "false");
        testMergingSpills(false, LZ4CompressionCodec.class.getName(), true);
    }

    @Test
    public void mergeSpillsWithEncryptionAndNoCompression() throws Exception {
        testMergingSpills(true, null, true);
    }

    @Test
    public void mergeSpillsWithFileStreamAndEncryptionAndNoCompression() throws Exception {
        testMergingSpills(false, null, true);
    }

    @Test
    public void writeEnoughDataToTriggerSpill() throws Exception {
        this.memoryManager.limit(134217728L);
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(false);
        ArrayList arrayList = new ArrayList();
        byte[] bArr = new byte[13421772];
        for (int i = 0; i < 11; i++) {
            arrayList.add(new Tuple2(Integer.valueOf(i), bArr));
        }
        createWriter.write(arrayList.iterator());
        Assert.assertEquals(2L, this.spillFilesCreated.size());
        createWriter.stop(true);
        readRecordsFromFile();
        assertSpillFilesWereCleanedUp();
        ShuffleWriteMetrics shuffleWriteMetrics = this.taskMetrics.shuffleWriteMetrics();
        Assert.assertEquals(arrayList.size(), shuffleWriteMetrics.recordsWritten());
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.lessThan(Long.valueOf(this.mergedOutputFile.length())));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.memoryBytesSpilled()), Matchers.greaterThan(0L));
        Assert.assertEquals(this.mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
    }

    @Test
    public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception {
        this.conf.set("spark.shuffle.sort.useRadixSort", "false");
        writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
        Assert.assertEquals(2L, this.spillFilesCreated.size());
    }

    @Test
    public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() throws Exception {
        this.conf.set("spark.shuffle.sort.useRadixSort", "true");
        writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
        Assert.assertEquals(3L, this.spillFilesCreated.size());
    }

    private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
        this.memoryManager.limit(65536L);
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(false);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4097; i++) {
            arrayList.add(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
        }
        createWriter.write(arrayList.iterator());
        createWriter.stop(true);
        readRecordsFromFile();
        assertSpillFilesWereCleanedUp();
        ShuffleWriteMetrics shuffleWriteMetrics = this.taskMetrics.shuffleWriteMetrics();
        Assert.assertEquals(arrayList.size(), shuffleWriteMetrics.recordsWritten());
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.diskBytesSpilled()), Matchers.lessThan(Long.valueOf(this.mergedOutputFile.length())));
        MatcherAssert.assertThat(Long.valueOf(this.taskMetrics.memoryBytesSpilled()), Matchers.greaterThan(0L));
        Assert.assertEquals(this.mergedOutputFile.length(), shuffleWriteMetrics.bytesWritten());
    }

    @Test
    public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception {
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(false);
        ArrayList arrayList = new ArrayList();
        byte[] bArr = new byte[2621440];
        new Random(42L).nextBytes(bArr);
        arrayList.add(new Tuple2(1, ByteBuffer.wrap(bArr)));
        createWriter.write(arrayList.iterator());
        createWriter.stop(true);
        Assert.assertEquals(HashMultiset.create(arrayList), HashMultiset.create(readRecordsFromFile()));
        assertSpillFilesWereCleanedUp();
    }

    @Test
    public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception {
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(1, ByteBuffer.wrap(new byte[1])));
        byte[] bArr = new byte[((int) this.taskMemoryManager.pageSizeBytes()) - NUM_PARTITITONS];
        new Random(42L).nextBytes(bArr);
        arrayList.add(new Tuple2(2, ByteBuffer.wrap(bArr)));
        byte[] bArr2 = new byte[(int) this.taskMemoryManager.pageSizeBytes()];
        new Random(42L).nextBytes(bArr2);
        arrayList.add(new Tuple2(3, ByteBuffer.wrap(bArr2)));
        createWriter.write(arrayList.iterator());
        createWriter.stop(true);
        Assert.assertEquals(HashMultiset.create(arrayList), HashMultiset.create(readRecordsFromFile()));
        assertSpillFilesWereCleanedUp();
    }

    @Test
    public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException {
        UnsafeShuffleWriter<Object, Object> createWriter = createWriter(false);
        createWriter.insertRecordIntoSorter(new Tuple2(1, 1));
        createWriter.insertRecordIntoSorter(new Tuple2(2, 2));
        createWriter.forceSorterToSpill();
        createWriter.insertRecordIntoSorter(new Tuple2(2, 2));
        createWriter.stop(false);
        assertSpillFilesWereCleanedUp();
    }

    @Test
    public void testPeakMemoryUsed() throws Exception {
        this.taskMemoryManager = (TaskMemoryManager) Mockito.spy(this.taskMemoryManager);
        Mockito.when(Long.valueOf(this.taskMemoryManager.pageSizeBytes())).thenReturn(256L);
        UnsafeShuffleWriter unsafeShuffleWriter = new UnsafeShuffleWriter(this.blockManager, this.shuffleBlockResolver, this.taskMemoryManager, new SerializedShuffleHandle(0, 1, this.shuffleDep), 0, this.taskContext, this.conf);
        long peakMemoryUsedBytes = unsafeShuffleWriter.getPeakMemoryUsedBytes();
        for (int i = 0; i < 320; i++) {
            try {
                unsafeShuffleWriter.insertRecordIntoSorter(new Tuple2(1, 1));
                long peakMemoryUsedBytes2 = unsafeShuffleWriter.getPeakMemoryUsedBytes();
                if (i % 32 == 0) {
                    Assert.assertEquals(peakMemoryUsedBytes + 256, peakMemoryUsedBytes2);
                } else {
                    Assert.assertEquals(peakMemoryUsedBytes, peakMemoryUsedBytes2);
                }
                peakMemoryUsedBytes = peakMemoryUsedBytes2;
            } catch (Throwable th) {
                unsafeShuffleWriter.stop(false);
                throw th;
            }
        }
        unsafeShuffleWriter.forceSorterToSpill();
        Assert.assertEquals(peakMemoryUsedBytes, unsafeShuffleWriter.getPeakMemoryUsedBytes());
        for (int i2 = 0; i2 < 32; i2++) {
            unsafeShuffleWriter.insertRecordIntoSorter(new Tuple2(1, 1));
        }
        Assert.assertEquals(peakMemoryUsedBytes, unsafeShuffleWriter.getPeakMemoryUsedBytes());
        unsafeShuffleWriter.closeAndWriteOutput();
        Assert.assertEquals(peakMemoryUsedBytes, unsafeShuffleWriter.getPeakMemoryUsedBytes());
        unsafeShuffleWriter.stop(false);
    }
}
