package org.apache.spark.network.shuffle;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.RemoteBlockPushResolver;
import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.class */
public class RemoteBlockPushResolverSuite {
    private static final Logger log = LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
    private final String TEST_APP = "testApp";
    private final String MERGE_DIRECTORY = "merge_manager";
    private final int NO_ATTEMPT_ID = -1;
    private final int ATTEMPT_ID_1 = 1;
    private final int ATTEMPT_ID_2 = 2;
    private final String MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDir\": \"merge_manager\"}";
    private final String MERGE_DIRECTORY_META_1 = "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}";
    private final String MERGE_DIRECTORY_META_2 = "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}";
    private final String INVALID_MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"}";
    private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401";
    private TransportConf conf;
    private RemoteBlockPushResolver pushResolver;
    private Path[] localDirs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite$PushBlock.class */
    public static class PushBlock {
        private final int shuffleId;
        private final int shuffleMergeId;
        private final int mapIndex;
        private final int reduceId;
        private final ByteBuffer buffer;

        PushBlock(int i, int i2, int i3, int i4, ByteBuffer byteBuffer) {
            this.shuffleId = i;
            this.shuffleMergeId = i2;
            this.mapIndex = i3;
            this.reduceId = i4;
            this.buffer = byteBuffer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite$TestMergeShuffleFile.class */
    public static class TestMergeShuffleFile extends RemoteBlockPushResolver.MergeShuffleFile {
        private DataOutputStream activeDos;
        private File file;
        private FileChannel channel;

        private TestMergeShuffleFile(File file) throws IOException {
            super(file);
            this.file = file;
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            this.channel = fileOutputStream.getChannel();
            this.activeDos = new DataOutputStream(fileOutputStream);
        }

        public DataOutputStream getDos() {
            return this.activeDos;
        }

        FileChannel getChannel() {
            return this.channel;
        }

        void close() throws IOException {
            this.activeDos.close();
        }

        void restore() throws IOException {
            FileOutputStream fileOutputStream = new FileOutputStream(this.file, true);
            this.channel = fileOutputStream.getChannel();
            this.activeDos = new DataOutputStream(fileOutputStream);
        }
    }

    @Before
    public void before() throws IOException {
        this.localDirs = createLocalDirs(2);
        this.conf = new TransportConf("shuffle", new MapConfigProvider(ImmutableMap.of("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "4")));
        this.pushResolver = new RemoteBlockPushResolver(this.conf, (File) null);
        registerExecutor("testApp", prepareLocalDirs(this.localDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
    }

    @After
    public void after() {
        try {
            for (Path path : this.localDirs) {
                FileUtils.deleteDirectory(path.toFile());
            }
            removeApplication("testApp");
        } catch (Exception e) {
            log.debug("Error while tearing down", e);
        }
    }

    @Test
    public void testErrorLogging() {
        ErrorHandler.BlockPushErrorHandler createErrorHandler = RemoteBlockPushResolver.createErrorHandler();
        Assert.assertFalse(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
        Assert.assertFalse(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
        Assert.assertFalse(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
        Assert.assertFalse(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
        Assert.assertTrue(createErrorHandler.shouldLogError(new Throwable()));
    }

    @Test
    public void testNoIndexFile() {
        Assert.assertTrue(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0);
        })).getMessage().startsWith("Merged shuffle index file"));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testBasicBlockMerge() throws IOException {
        pushBlockHelper("testApp", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4])), new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[5]))});
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{0}, new long[]{9});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 5}, new int[]{new int[]{0}, new int[]{1}});
        verifyMetrics(9L, 0L, 0L, 0L, 0L, 0L, 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v8, types: [int[], int[][]] */
    @Test
    public void testDividingMergedBlocksIntoChunks() throws IOException {
        pushBlockHelper("testApp", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[2])), new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[3])), new PushBlock(0, 0, 2, 0, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 0, 3, 0, ByteBuffer.wrap(new byte[3]))});
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{0}, new long[]{13});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{5, 5, 3}, new int[]{new int[]{0, 1}, new int[]{2}, new int[]{3}});
        verifyMetrics(13L, 0L, 0L, 0L, 0L, 0L, 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testFinalizeWithMultipleReducePartitions() throws IOException {
        pushBlockHelper("testApp", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[2])), new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[3])), new PushBlock(0, 0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 0, 1, 1, ByteBuffer.wrap(new byte[3]))});
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{0, 1}, new long[]{5, 8});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{5}, new int[]{new int[]{0, 1}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[3]));
        verifyMetrics(2L, 0L, 0L, 3L, 1L, 0L, 0L);
        Assert.assertEquals("cached bytes", 3L, ((Counter) this.pushResolver.getMetrics().getMetrics().get("deferredBlockBytes")).getCount());
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[3]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 6}, new int[]{new int[]{0}, new int[]{1}});
        verifyMetrics(10L, 0L, 0L, 0L, 0L, 0L, 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[3]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[3]));
        verifyMetrics(2L, 0L, 0L, 6L, 2L, 0L, 0L);
        Assert.assertEquals("cached bytes", 6L, ((Counter) this.pushResolver.getMetrics().getMetrics().get("deferredBlockBytes")).getCount());
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 6}, new int[]{new int[]{0}, new int[]{1}});
        verifyMetrics(10L, 0L, 0L, 0L, 0L, 0L, 0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4}, new int[]{new int[]{0}});
        verifyMetrics(4L, 0L, 1L, 0L, 0L, 0L, 4L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4}, new int[]{new int[]{0}});
        verifyMetrics(4L, 0L, 0L, 0L, 0L, 0L, 4L);
    }

    @Test
    public void testFailureAfterData() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("Forced Failure"));
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        Assert.assertEquals("num-chunks", 0L, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0).getNumChunks());
        verifyMetrics(4L, 0L, 0L, 0L, 0L, 0L, 4L);
    }

    @Test
    public void testFailureAfterMultipleDataBlocks() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[3]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("Forced Failure"));
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        Assert.assertEquals("num-chunks", 0L, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0).getNumChunks());
        verifyMetrics(9L, 0L, 0L, 0L, 0L, 0L, 9L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v3, types: [int[], int[][]] */
    @Test
    public void testFailureAfterComplete() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[3]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("Forced Failure"));
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{9}, new int[]{new int[]{0}});
        verifyMetrics(9L, 0L, 0L, 0L, 0L, 0L, 9L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testBlockReceivedAfterMergeFinalize() throws IOException {
        ByteBuffer[] byteBufferArr = {ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5])};
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        for (ByteBuffer byteBuffer : byteBufferArr) {
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBuffer);
        }
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[4]));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream2.getID());
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{9}, new int[]{new int[]{0}});
        verifyMetrics(9L, 0L, 1L, 0L, 0L, 0L, 4L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testIncompleteStreamsAreOverwritten() throws IOException {
        registerExecutor("testApp", prepareLocalDirs(this.localDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
        byte[] bArr = new byte[4];
        ThreadLocalRandom.current().nextBytes(bArr);
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        byte[] bArr2 = new byte[10];
        ThreadLocalRandom.current().nextBytes(bArr2);
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(bArr2));
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("forced error"));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(bArr, 0, 2));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(bArr, 2, 2));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4}, new int[]{new int[]{1, 2}});
        Assert.assertArrayEquals(bArr, this.pushResolver.getMergedBlockData("testApp", 0, 0, 0, 0).nioByteBuffer().array());
        verifyMetrics(14L, 0L, 0L, 0L, 0L, 0L, 10L);
    }

    @Test
    public void testCollision() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream2.getID());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onFailure(receiveBlockDataAsStream2.getID(), new RuntimeException("forced error"));
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[5]));
        Assert.assertEquals("cached bytes", 5L, ((Counter) this.pushResolver.getMetrics().getMetrics().get("deferredBlockBytes")).getCount());
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream3.getID());
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4}, new int[]{new int[]{0}});
        verifyMetrics(4L, 1L, 0L, 0L, 0L, 0L, 0L);
    }

    @Test
    public void testUpdateLocalDirsOnlyOnce() throws IOException {
        String str = "updateLocalDirsOnlyOnceTest";
        Path[] createLocalDirs = createLocalDirs(1);
        registerExecutor("updateLocalDirsOnlyOnceTest", prepareLocalDirs(createLocalDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
        Assert.assertEquals(this.pushResolver.getMergedBlockDirs("updateLocalDirsOnlyOnceTest").length, 1L);
        Assert.assertTrue(this.pushResolver.getMergedBlockDirs("updateLocalDirsOnlyOnceTest")[0].contains(createLocalDirs[0].toFile().getPath()));
        registerExecutor("updateLocalDirsOnlyOnceTest", prepareLocalDirs(this.localDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
        Assert.assertEquals(this.pushResolver.getMergedBlockDirs("updateLocalDirsOnlyOnceTest").length, 1L);
        Assert.assertTrue(this.pushResolver.getMergedBlockDirs("updateLocalDirsOnlyOnceTest")[0].contains(createLocalDirs[0].toFile().getPath()));
        removeApplication("updateLocalDirsOnlyOnceTest");
        Assert.assertEquals(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.pushResolver.getMergedBlockDirs(str);
        })).getMessage(), "application updateLocalDirsOnlyOnceTest is not registered or NM was restarted.");
    }

    @Test
    public void testExecutorRegisterWithInvalidJsonForPushShuffle() throws IOException {
        String str = "executorRegisterWithInvalidShuffleManagerMeta";
        Path[] createLocalDirs = createLocalDirs(1);
        Assert.assertEquals("Failed to get the merge directory information from the shuffleManagerMeta shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"} in executor registration message", ((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            registerExecutor(str, prepareLocalDirs(createLocalDirs, "merge_manager"), "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"}");
        })).getMessage());
    }

    @Test
    public void testExecutorRegistrationFromTwoAppAttempts() throws IOException {
        String str = "testExecutorRegistrationFromTwoAppAttempts";
        Path[] createLocalDirs = createLocalDirs(1);
        registerExecutor("testExecutorRegistrationFromTwoAppAttempts", prepareLocalDirs(createLocalDirs, "merge_manager_1"), "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}");
        Assert.assertEquals(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts").length, 1L);
        Assert.assertTrue(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts")[0].contains(createLocalDirs[0].toFile().getPath()));
        registerExecutor("testExecutorRegistrationFromTwoAppAttempts", prepareLocalDirs(this.localDirs, "merge_manager_1"), "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}");
        Assert.assertEquals(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts").length, 1L);
        Assert.assertTrue(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts")[0].contains(createLocalDirs[0].toFile().getPath()));
        Path[] createLocalDirs2 = createLocalDirs(2);
        registerExecutor("testExecutorRegistrationFromTwoAppAttempts", prepareLocalDirs(createLocalDirs2, "merge_manager_2"), "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}");
        Assert.assertEquals(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts").length, 2L);
        Assert.assertTrue(this.pushResolver.getMergedBlockDirs("testExecutorRegistrationFromTwoAppAttempts")[0].contains(createLocalDirs2[0].toFile().getPath()));
        removeApplication("testExecutorRegistrationFromTwoAppAttempts");
        Assert.assertEquals(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.pushResolver.getMergedBlockDirs(str);
        })).getMessage(), "application testExecutorRegistrationFromTwoAppAttempts is not registered or NM was restarted.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v3, types: [int[], int[][]] */
    @Test
    public void testCleanUpDirectory() throws IOException, InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        this.pushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.1
            void deleteExecutorDirs(RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo) {
                super.deleteExecutorDirs(appShuffleInfo);
                semaphore.release();
            }
        };
        registerExecutor("cleanUpDirectory", prepareLocalDirs(createLocalDirs(1), "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
        pushBlockHelper("cleanUpDirectory", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4]))});
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("cleanUpDirectory", -1, 0, 0));
        validateChunks("cleanUpDirectory", 0, 0, 0, this.pushResolver.getMergedBlockMeta("cleanUpDirectory", 0, 0, 0), new int[]{4}, new int[]{new int[]{0}});
        String[] mergedBlockDirs = this.pushResolver.getMergedBlockDirs("cleanUpDirectory");
        this.pushResolver.applicationRemoved("cleanUpDirectory", true);
        semaphore.acquire();
        for (String str : mergedBlockDirs) {
            Assert.assertFalse(Files.exists(Paths.get(str, new String[0]), new LinkOption[0]));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testRecoverIndexFileAfterIOExceptions() throws IOException {
        useTestFiles(true, false);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        TestMergeShuffleFile testMergeShuffleFile = (TestMergeShuffleFile) receiveBlockDataAsStream.getPartitionInfo().getIndexFile();
        testMergeShuffleFile.close();
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        Assert.assertEquals("index position", 16L, testMergeShuffleFile.getPos());
        testMergeShuffleFile.restore();
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        Assert.assertEquals("index position", 24L, testMergeShuffleFile.getPos());
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{0}, new long[]{11});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 7}, new int[]{new int[]{0}, new int[]{1, 2}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException {
        useTestFiles(true, false);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        TestMergeShuffleFile testMergeShuffleFile = (TestMergeShuffleFile) receiveBlockDataAsStream.getPartitionInfo().getIndexFile();
        testMergeShuffleFile.close();
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        Assert.assertEquals("index position", 16L, testMergeShuffleFile.getPos());
        testMergeShuffleFile.restore();
        MergeStatuses finalizeShuffleMerge = this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        Assert.assertEquals("index position", 24L, testMergeShuffleFile.getPos());
        validateMergeStatuses(finalizeShuffleMerge, new int[]{0}, new long[]{9});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 5}, new int[]{new int[]{0}, new int[]{1}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testRecoverMetaFileAfterIOExceptions() throws IOException {
        useTestFiles(false, true);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = receiveBlockDataAsStream.getPartitionInfo();
        TestMergeShuffleFile testMergeShuffleFile = (TestMergeShuffleFile) partitionInfo.getMetaFile();
        long pos = testMergeShuffleFile.getPos();
        testMergeShuffleFile.close();
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        Assert.assertEquals("index position", 16L, partitionInfo.getIndexFile().getPos());
        Assert.assertEquals("meta position", pos, testMergeShuffleFile.getPos());
        testMergeShuffleFile.restore();
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        Assert.assertEquals("index position", 24L, partitionInfo.getIndexFile().getPos());
        Assert.assertTrue("meta position", testMergeShuffleFile.getPos() > pos);
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{0}, new long[]{11});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 7}, new int[]{new int[]{0}, new int[]{1, 2}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException {
        useTestFiles(false, true);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = receiveBlockDataAsStream.getPartitionInfo();
        TestMergeShuffleFile testMergeShuffleFile = (TestMergeShuffleFile) partitionInfo.getMetaFile();
        long pos = testMergeShuffleFile.getPos();
        testMergeShuffleFile.close();
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        RemoteBlockPushResolver.MergeShuffleFile indexFile = partitionInfo.getIndexFile();
        Assert.assertEquals("index position", 16L, indexFile.getPos());
        Assert.assertEquals("meta position", pos, testMergeShuffleFile.getPos());
        testMergeShuffleFile.restore();
        MergeStatuses finalizeShuffleMerge = this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        Assert.assertEquals("index position", 24L, indexFile.getPos());
        Assert.assertTrue("meta position", testMergeShuffleFile.getPos() > pos);
        validateMergeStatuses(finalizeShuffleMerge, new int[]{0}, new long[]{9});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 5}, new int[]{new int[]{0}, new int[]{1}});
    }

    @Test
    public void testIOExceptionsExceededThreshold() throws IOException {
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = receiveBlockDataAsStream.getPartitionInfo();
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        partitionInfo.getDataChannel().close();
        for (int i = 1; i < 5; i++) {
            RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, i, 0, 0));
            receiveBlockDataAsStream2.onFailure(receiveBlockDataAsStream2.getID(), (IOException) Assert.assertThrows(IOException.class, () -> {
                receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
            }));
        }
        Assert.assertEquals(4L, partitionInfo.getNumIOExceptions());
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 5, 0, 0));
        Assert.assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_5_0", ((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            receiveBlockDataAsStream3.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[1]));
        })).getMessage());
    }

    @Test
    public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException {
        useTestFiles(true, false);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = receiveBlockDataAsStream.getPartitionInfo();
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        ((TestMergeShuffleFile) partitionInfo.getIndexFile()).close();
        for (int i = 1; i < 5; i++) {
            RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, i, 0, 0));
            receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
            receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        }
        Assert.assertEquals(4L, partitionInfo.getNumIOExceptions());
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 5, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[4]));
        Assert.assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_5_0", ((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        })).getMessage());
    }

    @Test
    public void testRequestForAbortedShufflePartitionThrowsException() throws IOException {
        testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
        Assert.assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_10_0", ((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 10, 0, 0));
        })).getMessage());
    }

    @Test
    public void testPendingBlockIsAbortedImmediately() throws IOException {
        useTestFiles(true, false);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        ((TestMergeShuffleFile) receiveBlockDataAsStream.getPartitionInfo().getIndexFile()).close();
        for (int i = 1; i < 6; i++) {
            RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, i, 0, 0));
            receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
            if (i < 5) {
                receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
            } else {
                receiveBlockDataAsStream2.onFailure(receiveBlockDataAsStream2.getID(), Assert.assertThrows(Throwable.class, () -> {
                    receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
                }));
            }
        }
        Assert.assertEquals(5L, r0.getNumIOExceptions());
        Assert.assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0_0", ((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        })).getMessage());
    }

    @Test
    public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException {
        useTestFiles(true, false);
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        TestMergeShuffleFile testMergeShuffleFile = (TestMergeShuffleFile) receiveBlockDataAsStream.getPartitionInfo().getIndexFile();
        testMergeShuffleFile.close();
        for (int i = 1; i < 5; i++) {
            RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, i, 0, 0));
            receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
            receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        }
        Assert.assertEquals(4L, r0.getNumIOExceptions());
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 5, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream3.onFailure(receiveBlockDataAsStream3.getID(), Assert.assertThrows(Throwable.class, () -> {
            receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        }));
        Assert.assertEquals(5L, r0.getNumIOExceptions());
        testMergeShuffleFile.restore();
        Assert.assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0_0", ((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        })).getMessage());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v8, types: [int[], int[][]] */
    @Test
    public void testFailureWhileTruncatingFiles() throws IOException {
        useTestFiles(true, false);
        pushBlockHelper("testApp", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[2])), new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[3])), new PushBlock(0, 0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 0, 1, 1, ByteBuffer.wrap(new byte[3]))});
        RemoteBlockPushResolver.PushBlockStreamCallback receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        ((TestMergeShuffleFile) receiveBlockDataAsStream.getPartitionInfo().getIndexFile()).close();
        validateMergeStatuses(this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0)), new int[]{1}, new long[]{8});
        validateChunks("testApp", 0, 0, 1, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 1), new int[]{5, 3}, new int[]{new int[]{0}, new int[]{1}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("forced error"));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("2nd forced error"));
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 3, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{9, 2}, new int[]{new int[]{1}, new int[]{3}});
        removeApplication("testApp");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v6, types: [int[], int[][]] */
    @Test
    public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 1, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[5]));
        receiveBlockDataAsStream2.onFailure(receiveBlockDataAsStream3.getID(), new RuntimeException("forced error"));
        StreamCallbackWithID receiveBlockDataAsStream4 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 0, 2, 0, 0));
        receiveBlockDataAsStream4.onData(receiveBlockDataAsStream4.getID(), ByteBuffer.wrap(new byte[2]));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream4.onComplete(receiveBlockDataAsStream4.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream4.getID());
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{11}, new int[]{new int[]{0, 1}});
        removeApplication("testApp");
    }

    @Test
    public void testPushBlockFromPreviousAttemptIsRejected() throws IOException, InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        this.pushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.2
            void closeAndDeletePartitionsIfNeeded(RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo, boolean z) {
                super.closeAndDeletePartitionsIfNeeded(appShuffleInfo, z);
                semaphore.release();
            }
        };
        String str = "testPushBlockFromPreviousAttemptIsRejected";
        registerExecutor("testPushBlockFromPreviousAttemptIsRejected", prepareLocalDirs(createLocalDirs(1), "merge_manager_1"), "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}");
        ByteBuffer[] byteBufferArr = {ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5])};
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testPushBlockFromPreviousAttemptIsRejected", 1, 0, 0, 0, 0, 0));
        for (ByteBuffer byteBuffer : byteBufferArr) {
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBuffer);
        }
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        RemoteBlockPushResolver.AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo = (RemoteBlockPushResolver.AppShuffleMergePartitionsInfo) this.pushResolver.validateAndGetAppShuffleInfo("testPushBlockFromPreviousAttemptIsRejected").getShuffles().get(0);
        for (RemoteBlockPushResolver.AppShufflePartitionInfo appShufflePartitionInfo : appShuffleMergePartitionsInfo.getShuffleMergePartitions().values()) {
            Assert.assertTrue(appShufflePartitionInfo.getDataChannel().isOpen());
            Assert.assertTrue(appShufflePartitionInfo.getMetaFile().getChannel().isOpen());
            Assert.assertTrue(appShufflePartitionInfo.getIndexFile().getChannel().isOpen());
        }
        registerExecutor("testPushBlockFromPreviousAttemptIsRejected", prepareLocalDirs(createLocalDirs(2), "merge_manager_2"), "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}");
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testPushBlockFromPreviousAttemptIsRejected", 2, 0, 0, 1, 0, 0));
        for (ByteBuffer byteBuffer2 : byteBufferArr) {
            receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), byteBuffer2);
        }
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        semaphore.acquire();
        for (RemoteBlockPushResolver.AppShufflePartitionInfo appShufflePartitionInfo2 : appShuffleMergePartitionsInfo.getShuffleMergePartitions().values()) {
            Assert.assertFalse(appShufflePartitionInfo2.getDataChannel().isOpen());
            Assert.assertFalse(appShufflePartitionInfo2.getMetaFile().getChannel().isOpen());
            Assert.assertFalse(appShufflePartitionInfo2.getIndexFile().getChannel().isOpen());
        }
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            this.pushResolver.receiveBlockDataAsStream(new PushBlockStream(str, 1, 0, 0, 1, 0, 0));
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream2.getID());
    }

    @Test
    public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted() throws IOException {
        String str = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted";
        registerExecutor("testFinalizeShuffleMergeFromPreviousAttemptIsAborted", prepareLocalDirs(createLocalDirs(1), "merge_manager_1"), "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}");
        ByteBuffer[] byteBufferArr = {ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5])};
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testFinalizeShuffleMergeFromPreviousAttemptIsAborted", 1, 0, 0, 0, 0, 0));
        for (ByteBuffer byteBuffer : byteBufferArr) {
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBuffer);
        }
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        registerExecutor("testFinalizeShuffleMergeFromPreviousAttemptIsAborted", prepareLocalDirs(createLocalDirs(2), "merge_manager_2"), "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}");
        Assert.assertEquals(((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(str, 1, 0, 0));
        })).getMessage(), String.format("The attempt id %s in this FinalizeShuffleMerge message does not match with the current attempt id %s stored in shuffle service for application %s", 1, 2, "testFinalizeShuffleMergeFromPreviousAttemptIsAborted"));
    }

    @Test
    public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted() throws IOException, InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        this.pushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.3
            void closeAndDeletePartitionsIfNeeded(RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo, boolean z) {
                super.closeAndDeletePartitionsIfNeeded(appShuffleInfo, z);
                semaphore.release();
            }
        };
        registerExecutor("testOngoingMergeOfBlockFromPreviousAttemptIsAborted", prepareLocalDirs(createLocalDirs(1), "merge_manager_1"), "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}");
        ByteBuffer[] byteBufferArr = {ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5]), ByteBuffer.wrap(new byte[6]), ByteBuffer.wrap(new byte[7])};
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testOngoingMergeOfBlockFromPreviousAttemptIsAborted", 1, 0, 0, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBufferArr[0]);
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBufferArr[1]);
        registerExecutor("testOngoingMergeOfBlockFromPreviousAttemptIsAborted", prepareLocalDirs(createLocalDirs(2), "merge_manager_2"), "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}");
        semaphore.acquire();
        Assert.assertThrows(ClosedChannelException.class, () -> {
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), byteBufferArr[3]);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testBlockPushWithOlderShuffleMergeId() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 1, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 2, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        BlockPushNonFatalFailure assertThrows = Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        });
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("Forced Failure"));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(assertThrows.getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 2));
        validateChunks("testApp", 0, 2, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 2, 0), new int[]{4}, new int[]{new int[]{0}});
        verifyMetrics(6L, 0L, 0L, 0L, 0L, 2L, 4L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testFinalizeWithOlderShuffleMergeId() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 1, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 2, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        BlockPushNonFatalFailure assertThrows = Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        });
        receiveBlockDataAsStream.onFailure(receiveBlockDataAsStream.getID(), new RuntimeException("Forced Failure"));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(assertThrows.getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        Assert.assertEquals("Shuffle merge finalize request for shuffle 0 with shuffleMergeId 1 is stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the shuffle is already being pushed", ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 1));
        })).getMessage());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 2));
        validateChunks("testApp", 0, 2, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 2, 0), new int[]{4}, new int[]{new int[]{0}});
        verifyMetrics(6L, 0L, 0L, 0L, 0L, 2L, 4L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testFinalizeOfDeterminateShuffle() throws IOException {
        pushBlockHelper("testApp", -1, new PushBlock[]{new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4])), new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[5]))});
        MergeStatuses finalizeShuffleMerge = this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 0));
        Assert.assertTrue("Determinate shuffle should be marked finalized", ((RemoteBlockPushResolver.AppShuffleMergePartitionsInfo) this.pushResolver.validateAndGetAppShuffleInfo("testApp").getShuffles().get(0)).isFinalized());
        validateMergeStatuses(finalizeShuffleMerge, new int[]{0}, new long[]{9});
        validateChunks("testApp", 0, 0, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0), new int[]{4, 5}, new int[]{new int[]{0}, new int[]{1}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v4, types: [int[], int[][]] */
    @Test
    public void testBlockFetchWithOlderShuffleMergeId() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 1, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 0, 2, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, receiveBlockDataAsStream.getID());
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 2));
        Assert.assertEquals("MergedBlockMeta fetch for shuffle 0 with shuffleMergeId 0 reduceId 0 is stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for the shuffle is available", ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.pushResolver.getMergedBlockMeta("testApp", 0, 0, 0);
        })).getMessage());
        Assert.assertEquals("Shuffle merge finalize request for shuffle 0 with shuffleMergeId 1 is stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the shuffle is already being pushed", ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 0, 1));
        })).getMessage());
        Assert.assertEquals("MergedBlockData fetch for shuffle 0 with shuffleMergeId 1 reduceId 0 is stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for the shuffle is available", ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.pushResolver.getMergedBlockData("testApp", 0, 1, 0, 0);
        })).getMessage());
        validateChunks("testApp", 0, 2, 0, this.pushResolver.getMergedBlockMeta("testApp", 0, 2, 0), new int[]{4}, new int[]{new int[]{0}});
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r7v9, types: [int[], int[][]] */
    @Test
    public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.pushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.4
            void closeAndDeleteOutdatedPartitions(RemoteBlockPushResolver.AppAttemptShuffleMergeId appAttemptShuffleMergeId, Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> map) {
                copyOnWriteArrayList.add(appAttemptShuffleMergeId);
                super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, map);
                semaphore.release();
            }
        };
        registerExecutor("testCleanupOlderShuffleMergeId", prepareLocalDirs(this.localDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testCleanupOlderShuffleMergeId", -1, 0, 1, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testCleanupOlderShuffleMergeId", -1, 0, 2, 0, 0, 0));
        RemoteBlockPushResolver.AppShuffleInfo validateAndGetAppShuffleInfo = this.pushResolver.validateAndGetAppShuffleInfo("testCleanupOlderShuffleMergeId");
        semaphore.acquire();
        Assert.assertEquals(1L, copyOnWriteArrayList.size());
        Assert.assertEquals(1L, ((RemoteBlockPushResolver.AppAttemptShuffleMergeId) copyOnWriteArrayList.iterator().next()).shuffleMergeId);
        copyOnWriteArrayList.clear();
        Assert.assertFalse("Data files on the disk should be cleaned up", validateAndGetAppShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
        Assert.assertFalse("Meta files on the disk should be cleaned up", validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
        Assert.assertFalse("Index files on the disk should be cleaned up", new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists());
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testCleanupOlderShuffleMergeId", -1, 0, 2));
        validateChunks("testCleanupOlderShuffleMergeId", 0, 2, 0, this.pushResolver.getMergedBlockMeta("testCleanupOlderShuffleMergeId", 0, 2, 0), new int[]{4}, new int[]{new int[]{0}});
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testCleanupOlderShuffleMergeId", -1, 0, 3, 0, 0, 0));
        semaphore.acquire();
        Assert.assertEquals(1L, copyOnWriteArrayList.size());
        Assert.assertEquals(2L, ((RemoteBlockPushResolver.AppAttemptShuffleMergeId) copyOnWriteArrayList.iterator().next()).shuffleMergeId);
        copyOnWriteArrayList.clear();
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testCleanupOlderShuffleMergeId", -1, 0, 3));
        validateChunks("testCleanupOlderShuffleMergeId", 0, 3, 0, this.pushResolver.getMergedBlockMeta("testCleanupOlderShuffleMergeId", 0, 3, 0), new int[]{2}, new int[]{new int[]{0}});
        StreamCallbackWithID receiveBlockDataAsStream4 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testCleanupOlderShuffleMergeId", -1, 0, 4, 0, 0, 0));
        semaphore.acquire();
        Assert.assertEquals(1L, copyOnWriteArrayList.size());
        Assert.assertEquals(3L, ((RemoteBlockPushResolver.AppAttemptShuffleMergeId) copyOnWriteArrayList.iterator().next()).shuffleMergeId);
        copyOnWriteArrayList.clear();
        receiveBlockDataAsStream4.onData(receiveBlockDataAsStream4.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream4.onComplete(receiveBlockDataAsStream4.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testCleanupOlderShuffleMergeId", -1, 0, 5));
        semaphore.acquire();
        Assert.assertEquals(1L, copyOnWriteArrayList.size());
        Assert.assertEquals(4L, ((RemoteBlockPushResolver.AppAttemptShuffleMergeId) copyOnWriteArrayList.iterator().next()).shuffleMergeId);
        copyOnWriteArrayList.clear();
        Assert.assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned up", validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
        Assert.assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned up", new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(0, 4, 0)).exists());
        Assert.assertFalse("MergedBlock data file for shuffle 0 and shuffleMergeId 4 should be cleaned up", validateAndGetAppShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
    }

    @Test
    public void testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() {
        Assert.assertEquals("no partitions were merged", 0L, this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 1, 0)).reduceIds.length);
        Assert.assertTrue("shuffle 1 should be marked finalized", ((RemoteBlockPushResolver.AppShuffleMergePartitionsInfo) this.pushResolver.validateAndGetAppShuffleInfo("testApp").getShuffles().get(1)).isFinalized());
        removeApplication("testApp");
    }

    @Test
    public void testEmptyMergePartitionsAreNotReported() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 1, 0, 0, 100, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        Assert.assertEquals("no partitions were merged", 0L, this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 1, 0)).reduceIds.length);
        removeApplication("testApp");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r7v5, types: [int[], int[][]] */
    @Test
    public void testAllBlocksAreRejectedWhenReceivedAfterFinalization() throws IOException {
        StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 1, 0, 0, 100, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[4]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        this.pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testApp", -1, 1, 0));
        StreamCallbackWithID receiveBlockDataAsStream2 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 1, 0, 0, 200, 0));
        BlockPushReturnCode fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), fromByteBuffer.returnCode);
        Assert.assertEquals(fromByteBuffer.failureBlockId, "shufflePush_1_0_0_200");
        StreamCallbackWithID receiveBlockDataAsStream3 = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream("testApp", -1, 1, 0, 1, 100, 0));
        BlockPushReturnCode fromByteBuffer2 = BlockTransferMessage.Decoder.fromByteBuffer(Assert.assertThrows(BlockPushNonFatalFailure.class, () -> {
            receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        }).getResponse());
        Assert.assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(), fromByteBuffer2.returnCode);
        Assert.assertEquals(fromByteBuffer2.failureBlockId, "shufflePush_1_0_1_100");
        validateChunks("testApp", 1, 0, 100, this.pushResolver.getMergedBlockMeta("testApp", 1, 0, 100), new int[]{4}, new int[]{new int[]{0}});
        removeApplication("testApp");
    }

    @Test
    public void testJsonSerializationOfPushShufflePartitionInfo() throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        RemoteBlockPushResolver.AppAttemptId appAttemptId = new RemoteBlockPushResolver.AppAttemptId("foo", 1);
        Assert.assertEquals(appAttemptId, (RemoteBlockPushResolver.AppAttemptId) objectMapper.readValue(objectMapper.writeValueAsString(appAttemptId), RemoteBlockPushResolver.AppAttemptId.class));
        RemoteBlockPushResolver.AppPathsInfo appPathsInfo = new RemoteBlockPushResolver.AppPathsInfo(new String[]{"/foo", "/bar"}, 64);
        Assert.assertEquals(appPathsInfo, (RemoteBlockPushResolver.AppPathsInfo) objectMapper.readValue(objectMapper.writeValueAsString(appPathsInfo), RemoteBlockPushResolver.AppPathsInfo.class));
        RemoteBlockPushResolver.AppAttemptShuffleMergeId appAttemptShuffleMergeId = new RemoteBlockPushResolver.AppAttemptShuffleMergeId("foo", 1, 1, 1);
        Assert.assertEquals(appAttemptShuffleMergeId, (RemoteBlockPushResolver.AppAttemptShuffleMergeId) objectMapper.readValue(objectMapper.writeValueAsString(appAttemptShuffleMergeId), RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
        Assert.assertEquals(appAttemptId, objectMapper.readValue("{\"appId\": \"foo\", \"attemptId\":\"1\"}", RemoteBlockPushResolver.AppAttemptId.class));
        Assert.assertEquals(appPathsInfo, objectMapper.readValue("{\"activeLocalDirs\": [\"/foo\", \"/bar\"], \"subDirsPerLocalDir\":\"64\"}", RemoteBlockPushResolver.AppPathsInfo.class));
        Assert.assertEquals(appAttemptShuffleMergeId, objectMapper.readValue("{\"appId\":\"foo\", \"attemptId\":\"1\", \"shuffleId\":\"1\", \"shuffleMergeId\":\"1\"}", RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
    }

    @Test
    public void testRemoveShuffleMerge() throws IOException, InterruptedException {
        final Semaphore semaphore = new Semaphore(0);
        String str = "testRemoveShuffleMerge";
        RemoteBlockPushResolver remoteBlockPushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.5
            void closeAndDeleteOutdatedPartitions(RemoteBlockPushResolver.AppAttemptShuffleMergeId appAttemptShuffleMergeId, Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> map) {
                super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, map);
                semaphore.release();
            }

            void deleteMergedFiles(RemoteBlockPushResolver.AppAttemptShuffleMergeId appAttemptShuffleMergeId, RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo, int[] iArr, boolean z) {
                super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, iArr, z);
                semaphore.release();
            }
        };
        remoteBlockPushResolver.registerExecutor("testRemoveShuffleMerge", new ExecutorShuffleInfo(prepareLocalDirs(this.localDirs, "merge_manager"), 1, "shuffleManager:{\"mergeDir\": \"merge_manager\"}"));
        RemoteBlockPushResolver.AppShuffleInfo validateAndGetAppShuffleInfo = remoteBlockPushResolver.validateAndGetAppShuffleInfo("testRemoveShuffleMerge");
        StreamCallbackWithID receiveBlockDataAsStream = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 0, 1, 0, 0, 0));
        receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        remoteBlockPushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testRemoveShuffleMerge", -1, 0, 1));
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
        Assert.assertTrue(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
        remoteBlockPushResolver.removeShuffleMerge(new RemoveShuffleMerge("testRemoveShuffleMerge", -1, 0, 1));
        semaphore.tryAcquire(10L, TimeUnit.SECONDS);
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
        Assert.assertFalse(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists());
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
        StreamCallbackWithID receiveBlockDataAsStream2 = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 1, 1, 0, 0, 0));
        receiveBlockDataAsStream2.onData(receiveBlockDataAsStream2.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream2.onComplete(receiveBlockDataAsStream2.getID());
        remoteBlockPushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testRemoveShuffleMerge", -1, 1, 1));
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists());
        Assert.assertTrue(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(1, 1, 0)).exists());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists());
        remoteBlockPushResolver.removeShuffleMerge(new RemoveShuffleMerge("testRemoveShuffleMerge", -1, 1, -1));
        semaphore.tryAcquire(10L, TimeUnit.SECONDS);
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists());
        Assert.assertFalse(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists());
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists());
        StreamCallbackWithID receiveBlockDataAsStream3 = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 2, 1, 0, 0, 0));
        receiveBlockDataAsStream3.onData(receiveBlockDataAsStream3.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream3.onComplete(receiveBlockDataAsStream3.getID());
        remoteBlockPushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testRemoveShuffleMerge", -1, 2, 1));
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
        Assert.assertTrue(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(2, 1, 0)).exists());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(2, 1, 0).exists());
        Assert.assertEquals("Asked to remove old shuffle merged data for application testRemoveShuffleMerge shuffleId 2 shuffleMergeId 0, but current shuffleMergeId 1 ", ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            remoteBlockPushResolver.removeShuffleMerge(new RemoveShuffleMerge(str, -1, 2, 0));
        })).getMessage());
        StreamCallbackWithID receiveBlockDataAsStream4 = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 3, 1, 0, 0, 0));
        receiveBlockDataAsStream4.onData(receiveBlockDataAsStream4.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream4.onComplete(receiveBlockDataAsStream4.getID());
        remoteBlockPushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testRemoveShuffleMerge", -1, 3, 1));
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());
        Assert.assertTrue(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(3, 1, 0)).exists());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists());
        remoteBlockPushResolver.removeShuffleMerge(new RemoveShuffleMerge("testRemoveShuffleMerge", -1, 3, 2));
        semaphore.tryAcquire(10L, TimeUnit.SECONDS);
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());
        Assert.assertFalse(new File(validateAndGetAppShuffleInfo.getMergedShuffleIndexFilePath(3, 1, 0)).exists());
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists());
        StreamCallbackWithID receiveBlockDataAsStream5 = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 4, 1, 0, 0, 0));
        receiveBlockDataAsStream5.onData(receiveBlockDataAsStream5.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream5.onComplete(receiveBlockDataAsStream5.getID());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists());
        remoteBlockPushResolver.removeShuffleMerge(new RemoveShuffleMerge("testRemoveShuffleMerge", -1, 4, 1));
        semaphore.tryAcquire(10L, TimeUnit.SECONDS);
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists());
        StreamCallbackWithID receiveBlockDataAsStream6 = remoteBlockPushResolver.receiveBlockDataAsStream(new PushBlockStream("testRemoveShuffleMerge", -1, 5, 1, 0, 0, 0));
        receiveBlockDataAsStream6.onData(receiveBlockDataAsStream6.getID(), ByteBuffer.wrap(new byte[2]));
        receiveBlockDataAsStream6.onComplete(receiveBlockDataAsStream6.getID());
        Assert.assertTrue(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists());
        remoteBlockPushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge("testRemoveShuffleMerge", -1, 5, 2));
        semaphore.tryAcquire(10L, TimeUnit.SECONDS);
        Assert.assertFalse(validateAndGetAppShuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists());
    }

    private void useTestFiles(final boolean z, final boolean z2) throws IOException {
        this.pushResolver = new RemoteBlockPushResolver(this.conf, null) { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolverSuite.6
            RemoteBlockPushResolver.AppShufflePartitionInfo newAppShufflePartitionInfo(RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo, int i, int i2, int i3, File file, File file2, File file3) throws IOException {
                return new RemoteBlockPushResolver.AppShufflePartitionInfo(new RemoteBlockPushResolver.AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, i, i2), i3, file, z ? new TestMergeShuffleFile(file2) : new RemoteBlockPushResolver.MergeShuffleFile(file2), z2 ? new TestMergeShuffleFile(file3) : new RemoteBlockPushResolver.MergeShuffleFile(file3));
            }
        };
        registerExecutor("testApp", prepareLocalDirs(this.localDirs, "merge_manager"), "shuffleManager:{\"mergeDir\": \"merge_manager\"}");
    }

    private Path[] createLocalDirs(int i) throws IOException {
        Path[] pathArr = new Path[i];
        for (int i2 = 0; i2 < pathArr.length; i2++) {
            pathArr[i2] = Files.createTempDirectory("shuffleMerge", new FileAttribute[0]);
            pathArr[i2].toFile().deleteOnExit();
        }
        return pathArr;
    }

    private void registerExecutor(String str, String[] strArr, String str2) {
        this.pushResolver.registerExecutor(str, new ExecutorShuffleInfo(strArr, 1, str2));
    }

    private String[] prepareLocalDirs(Path[] pathArr, String str) throws IOException {
        String[] strArr = new String[pathArr.length];
        for (int i = 0; i < pathArr.length; i++) {
            Files.createDirectories(pathArr[i].resolve(str + File.separator + "00"), new FileAttribute[0]);
            strArr[i] = pathArr[i].toFile().getPath() + File.separator + "blockmgr-193d8401";
        }
        return strArr;
    }

    private void removeApplication(String str) {
        this.pushResolver.applicationRemoved(str, false);
    }

    private void validateMergeStatuses(MergeStatuses mergeStatuses, int[] iArr, long[] jArr) {
        Assert.assertArrayEquals(iArr, mergeStatuses.reduceIds);
        Assert.assertArrayEquals(jArr, mergeStatuses.sizes);
    }

    private void validateChunks(String str, int i, int i2, int i3, MergedBlockMeta mergedBlockMeta, int[] iArr, int[][] iArr2) throws IOException {
        Assert.assertEquals("num chunks", iArr.length, mergedBlockMeta.getNumChunks());
        RoaringBitmap[] readChunkBitmaps = mergedBlockMeta.readChunkBitmaps();
        Assert.assertEquals("num of bitmaps", mergedBlockMeta.getNumChunks(), readChunkBitmaps.length);
        for (int i4 = 0; i4 < mergedBlockMeta.getNumChunks(); i4++) {
            RoaringBitmap roaringBitmap = readChunkBitmaps[i4];
            Assert.assertEquals("cardinality", iArr2[i4].length, roaringBitmap.getCardinality());
            Arrays.stream(iArr2[i4]).forEach(i5 -> {
                Assert.assertTrue(roaringBitmap.contains(i5));
            });
        }
        for (int i6 = 0; i6 < mergedBlockMeta.getNumChunks(); i6++) {
            Assert.assertEquals(iArr[i6], this.pushResolver.getMergedBlockData(str, i, i2, i3, i6).getLength());
        }
    }

    private void pushBlockHelper(String str, int i, PushBlock[] pushBlockArr) throws IOException {
        for (PushBlock pushBlock : pushBlockArr) {
            StreamCallbackWithID receiveBlockDataAsStream = this.pushResolver.receiveBlockDataAsStream(new PushBlockStream(str, i, pushBlock.shuffleId, pushBlock.shuffleMergeId, pushBlock.mapIndex, pushBlock.reduceId, 0));
            receiveBlockDataAsStream.onData(receiveBlockDataAsStream.getID(), pushBlock.buffer);
            receiveBlockDataAsStream.onComplete(receiveBlockDataAsStream.getID());
        }
    }

    private void verifyMetrics(long j, long j2, long j3, long j4, long j5, long j6, long j7) {
        Map metrics = this.pushResolver.getMetrics().getMetrics();
        Assert.assertEquals("bytes written", j, ((Meter) metrics.get("blockBytesWritten")).getCount());
        Assert.assertEquals("could not find opportunity responses", j2, ((Meter) metrics.get("blockAppendCollisions")).getCount());
        Assert.assertEquals("too late responses", j3, ((Meter) metrics.get("lateBlockPushes")).getCount());
        Assert.assertEquals("cached block bytes", j4, ((Counter) metrics.get("deferredBlockBytes")).getCount());
        Assert.assertEquals("deferred blocks", j5, ((Meter) metrics.get("deferredBlocks")).getCount());
        Assert.assertEquals("stale block pushes", j6, ((Meter) metrics.get("staleBlockPushes")).getCount());
        Assert.assertEquals("ignored block bytes", j7, ((Meter) metrics.get("ignoredBlockBytes")).getCount());
    }
}
