package org.apache.spark.shuffle;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.spark.HashPartitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.executor.CoarseGrainedExecutorBackend;
import org.apache.spark.internal.config.package$;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import org.apache.spark.network.shuffle.BlockPushingListener;
import org.apache.spark.network.shuffle.BlockStoreClient;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.shuffle.ShuffleBlockPusher;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockManagerId$;
import org.apache.spark.util.ThreadUtils$;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import scala.Array$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.HashSet;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.Tuple2Zipped$;
import scala.runtime.Tuple2Zipped$Ops$;

/* compiled from: ShuffleBlockPusherSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001dc\u0001B\u0015+\u0001MBQA\u0010\u0001\u0005\u0002}B\u0011B\u0011\u0001A\u0002\u0003\u0007I\u0011B\"\t\u0013)\u0003\u0001\u0019!a\u0001\n\u0013Y\u0005\"\u0003+\u0001\u0001\u0004\u0005\t\u0015)\u0003E\u0011%\u0011\u0007\u00011AA\u0002\u0013%1\rC\u0005k\u0001\u0001\u0007\t\u0019!C\u0005W\"IQ\u000e\u0001a\u0001\u0002\u0003\u0006K\u0001\u001a\u0005\n_\u0002\u0001\r\u00111A\u0005\nAD\u0011\u0002\u001f\u0001A\u0002\u0003\u0007I\u0011B=\t\u0013m\u0004\u0001\u0019!A!B\u0013\t\b\"C?\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u007f\u0011-\tY\u0001\u0001a\u0001\u0002\u0004%I!!\u0004\t\u0015\u0005E\u0001\u00011A\u0001B\u0003&q\u0010C\u0006\u0002\u0016\u0001\u0001\r\u00111A\u0005\n\u0005]\u0001bCA\u0010\u0001\u0001\u0007\t\u0019!C\u0005\u0003CA1\"!\n\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002\u001a!I\u0011q\u0005\u0001A\u0002\u0013%\u0011\u0011\u0006\u0005\n\u0003#\u0002\u0001\u0019!C\u0005\u0003'B\u0001\"a\u0016\u0001A\u0003&\u00111\u0006\u0005\b\u00033\u0002A\u0011IA.\u0011\u001d\ti\u0006\u0001C!\u00037Bq!a\u0018\u0001\t\u0013\tY\u0006C\u0004\u0002b\u0001!I!a\u0019\t\u000f\u0005}\u0005\u0001\"\u0003\u0002\"\u001a1\u0011Q\u0016\u0001\u0005\u0003_C!\"!\u0006\u001a\u0005\u0003\u0005\u000b\u0011BA\r\u0011\u0019q\u0014\u0004\"\u0001\u00022\"I\u0011\u0011X\rC\u0002\u0013\u0005\u00111\u0018\u0005\t\u0003;L\u0002\u0015!\u0003\u0002>\"9\u0011q\\\r\u0005R\u0005\u0005\bbBAt3\u0011\u0005\u00111\f\u0005\b\u0003SLB\u0011KAv\r\u0019\u0011\u0019\u0003\u0001\u0003\u0003&!Q\u0011QC\u0011\u0003\u0002\u0003\u0006I!!\u0007\t\u0015\t\u001d\u0012E!A!\u0002\u0013\u0011I\u0003\u0003\u0004?C\u0011\u0005!q\u0006\u0005\n\u0003K\u000b#\u0019!C\u0001\u0005oA\u0001Ba\u0010\"A\u0003%!\u0011\b\u0005\b\u0003?\fC\u0011\u000bB!\u0011\u001d\u0011)%\tC!\u00037\u0012qc\u00155vM\u001adWM\u00117pG.\u0004Vo\u001d5feN+\u0018\u000e^3\u000b\u0005-b\u0013aB:ik\u001a4G.\u001a\u0006\u0003[9\nQa\u001d9be.T!a\f\u0019\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0014aA8sO\u000e\u00011c\u0001\u00015qA\u0011QGN\u0007\u0002Y%\u0011q\u0007\f\u0002\u000e'B\f'o\u001b$v]N+\u0018\u000e^3\u0011\u0005ebT\"\u0001\u001e\u000b\u0005m\u0002\u0014!C:dC2\fG/Z:u\u0013\ti$H\u0001\nCK\u001a|'/Z!oI\u00063G/\u001a:FC\u000eD\u0017A\u0002\u001fj]&$h\bF\u0001A!\t\t\u0005!D\u0001+\u00031\u0011Gn\\2l\u001b\u0006t\u0017mZ3s+\u0005!\u0005CA#I\u001b\u00051%BA$-\u0003\u001d\u0019Ho\u001c:bO\u0016L!!\u0013$\u0003\u0019\tcwnY6NC:\fw-\u001a:\u0002!\tdwnY6NC:\fw-\u001a:`I\u0015\fHC\u0001'S!\ti\u0005+D\u0001O\u0015\u0005y\u0015!B:dC2\f\u0017BA)O\u0005\u0011)f.\u001b;\t\u000fM\u001b\u0011\u0011!a\u0001\t\u0006\u0019\u0001\u0010J\u0019\u0002\u001b\tdwnY6NC:\fw-\u001a:!Q\u0011!a\u000bX/\u0011\u0005]SV\"\u0001-\u000b\u0005e\u0003\u0014aB7pG.LGo\\\u0005\u00037b\u0013A!T8dW\u00061\u0011M\\:xKJ$\u0013AX\u0005\u0003?\u0002\f1CU#U+Js5kX*N\u0003J#vLT+M\u0019NS!!\u0019-\u0002\u000f\u0005s7o^3sg\u0006QA-\u001a9f]\u0012,gnY=\u0016\u0003\u0011\u0004R!N3hO\u001eL!A\u001a\u0017\u0003#MCWO\u001a4mK\u0012+\u0007/\u001a8eK:\u001c\u0017\u0010\u0005\u0002NQ&\u0011\u0011N\u0014\u0002\u0004\u0013:$\u0018A\u00043fa\u0016tG-\u001a8ds~#S-\u001d\u000b\u0003\u00192Dqa\u0015\u0004\u0002\u0002\u0003\u0007A-A\u0006eKB,g\u000eZ3oGf\u0004\u0003\u0006B\u0004W9v\u000bQb\u001d5vM\u001adWm\u00117jK:$X#A9\u0011\u0005I4X\"A:\u000b\u0005-\"(BA;-\u0003\u001dqW\r^<pe.L!a^:\u0003!\tcwnY6Ti>\u0014Xm\u00117jK:$\u0018!E:ik\u001a4G.Z\"mS\u0016tGo\u0018\u0013fcR\u0011AJ\u001f\u0005\b'&\t\t\u00111\u0001r\u00039\u0019\b.\u001e4gY\u0016\u001cE.[3oi\u0002BCA\u0003,];\u0006yQ\r_3dkR|'OQ1dW\u0016tG-F\u0001��!\u0011\t\t!a\u0002\u000e\u0005\u0005\r!bAA\u0003Y\u0005AQ\r_3dkR|'/\u0003\u0003\u0002\n\u0005\r!\u0001H\"pCJ\u001cXm\u0012:bS:,G-\u0012=fGV$xN\u001d\"bG.,g\u000eZ\u0001\u0014Kb,7-\u001e;pe\n\u000b7m[3oI~#S-\u001d\u000b\u0004\u0019\u0006=\u0001bB*\r\u0003\u0003\u0005\ra`\u0001\u0011Kb,7-\u001e;pe\n\u000b7m[3oI\u0002BC!\u0004,];\u0006!1m\u001c8g+\t\tI\u0002E\u00026\u00037I1!!\b-\u0005%\u0019\u0006/\u0019:l\u0007>tg-\u0001\u0005d_:4w\fJ3r)\ra\u00151\u0005\u0005\t'>\t\t\u00111\u0001\u0002\u001a\u0005)1m\u001c8gA\u0005a\u0001/^:iK\u0012\u0014En\\2lgV\u0011\u00111\u0006\t\u0007\u0003[\t9$a\u000f\u000e\u0005\u0005=\"\u0002BA\u0019\u0003g\tq!\\;uC\ndWMC\u0002\u000269\u000b!bY8mY\u0016\u001cG/[8o\u0013\u0011\tI$a\f\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\t\u0005\u0003{\tYE\u0004\u0003\u0002@\u0005\u001d\u0003cAA!\u001d6\u0011\u00111\t\u0006\u0004\u0003\u000b\u0012\u0014A\u0002\u001fs_>$h(C\u0002\u0002J9\u000ba\u0001\u0015:fI\u00164\u0017\u0002BA'\u0003\u001f\u0012aa\u0015;sS:<'bAA%\u001d\u0006\u0001\u0002/^:iK\u0012\u0014En\\2lg~#S-\u001d\u000b\u0004\u0019\u0006U\u0003\u0002C*\u0013\u0003\u0003\u0005\r!a\u000b\u0002\u001bA,8\u000f[3e\u00052|7m[:!\u0003)\u0011WMZ8sK\u0016\u000b7\r\u001b\u000b\u0002\u0019\u0006I\u0011M\u001a;fe\u0016\u000b7\r[\u0001 S:$XM]2faR\u0004Vo\u001d5fI\ncwnY6t\r>\u00148+^2dKN\u001c\u0018A\u0005<fe&4\u0017\u0010U;tQJ+\u0017/^3tiN$R\u0001TA3\u00033Cq!a\u001a\u0018\u0001\u0004\tI'\u0001\u0007qkND'+Z9vKN$8\u000f\u0005\u0004\u0002l\u0005U\u00141\u0010\b\u0005\u0003[\n\tH\u0004\u0003\u0002B\u0005=\u0014\"A(\n\u0007\u0005Md*A\u0004qC\u000e\\\u0017mZ3\n\t\u0005]\u0014\u0011\u0010\u0002\u0004'\u0016\f(bAA:\u001dB!\u0011QPAJ\u001d\u0011\ty(a$\u000f\t\u0005\u0005\u0015Q\u0012\b\u0005\u0003\u0007\u000bYI\u0004\u0003\u0002\u0006\u0006%e\u0002BA!\u0003\u000fK\u0011!M\u0005\u0003_AJ!!\f\u0018\n\u0005-b\u0013bAAIU\u0005\u00112\u000b[;gM2,'\t\\8dWB+8\u000f[3s\u0013\u0011\t)*a&\u0003\u0017A+8\u000f\u001b*fcV,7\u000f\u001e\u0006\u0004\u0003#S\u0003bBAN/\u0001\u0007\u0011QT\u0001\u000eKb\u0004Xm\u0019;fINK'0Z:\u0011\u000b\u0005-\u0014QO4\u00021Y,'/\u001b4z\u00052|7m\u001b)vg\"\u001cu.\u001c9mKR,G\rF\u0002M\u0003GCq!!*\u0019\u0001\u0004\t9+A\u0006cY>\u001c7\u000eU;tQ\u0016\u0014\bcA!\u0002*&\u0019\u00111\u0016\u0016\u0003%MCWO\u001a4mK\ncwnY6QkNDWM\u001d\u0002\u0017)\u0016\u001cHo\u00155vM\u001adWM\u00117pG.\u0004Vo\u001d5feN\u0019\u0011$a*\u0015\t\u0005M\u0016q\u0017\t\u0004\u0003kKR\"\u0001\u0001\t\u000f\u0005U1\u00041\u0001\u0002\u001a\u0005)A/Y:lgV\u0011\u0011Q\u0018\t\u0007\u0003\u007f\u000bi-!5\u000e\u0005\u0005\u0005'\u0002BAb\u0003\u000b\f!bY8oGV\u0014(/\u001a8u\u0015\u0011\t9-!3\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003\u0017\fAA[1wC&!\u0011qZAa\u0005Ma\u0015N\\6fI\ncwnY6j]\u001e\fV/Z;f!\u0011\t\u0019.!7\u000e\u0005\u0005U'\u0002BAl\u0003\u0013\fA\u0001\\1oO&!\u00111\\Ak\u0005!\u0011VO\u001c8bE2,\u0017A\u0002;bg.\u001c\b%\u0001\u0006tk\nl\u0017\u000e\u001e+bg.$2\u0001TAr\u0011\u001d\t)O\ba\u0001\u0003#\fA\u0001^1tW\u0006y!/\u001e8QK:$\u0017N\\4UCN\\7/A\nde\u0016\fG/\u001a*fcV,7\u000f\u001e\"vM\u001a,'\u000f\u0006\u0006\u0002n\u0006e(Q\u0001B\u000b\u0005?\u0001B!a<\u0002v6\u0011\u0011\u0011\u001f\u0006\u0004\u0003g$\u0018A\u00022vM\u001a,'/\u0003\u0003\u0002x\u0006E(!D'b]\u0006<W\r\u001a\"vM\u001a,'\u000fC\u0004\u0002\u0016\u0001\u0002\r!a?\u0011\t\u0005u(\u0011A\u0007\u0003\u0003\u007fT1!a2u\u0013\u0011\u0011\u0019!a@\u0003\u001bQ\u0013\u0018M\\:q_J$8i\u001c8g\u0011\u001d\u00119\u0001\ta\u0001\u0005\u0013\t\u0001\u0002Z1uC\u001aKG.\u001a\t\u0005\u0005\u0017\u0011\t\"\u0004\u0002\u0003\u000e)!!qBAe\u0003\tIw.\u0003\u0003\u0003\u0014\t5!\u0001\u0002$jY\u0016DqAa\u0006!\u0001\u0004\u0011I\"\u0001\u0004pM\u001a\u001cX\r\u001e\t\u0004\u001b\nm\u0011b\u0001B\u000f\u001d\n!Aj\u001c8h\u0011\u001d\u0011\t\u0003\ta\u0001\u00053\ta\u0001\\3oORD'!G\"p]\u000e,(O]3oiR+7\u000f\u001e\"m_\u000e\\\u0007+^:iKJ\u001c2!IAZ\u0003%\u0019X-\\1qQ>\u0014X\r\u0005\u0003\u0002@\n-\u0012\u0002\u0002B\u0017\u0003\u0003\u0014\u0011bU3nCBDwN]3\u0015\r\tE\"1\u0007B\u001b!\r\t),\t\u0005\b\u0003+!\u0003\u0019AA\r\u0011\u001d\u00119\u0003\na\u0001\u0005S)\"A!\u000f\u0011\t\u0005}&1H\u0005\u0005\u0005{\t\tM\u0001\nUQJ,\u0017\r\u001a)p_2,\u00050Z2vi>\u0014\u0018\u0001\u00042m_\u000e\\\u0007+^:iKJ\u0004Cc\u0001'\u0003D!9\u0011Q]\u0014A\u0002\u0005E\u0017a\b8pi&4\u0017\u0010\u0012:jm\u0016\u0014\u0018IY8viB+8\u000f[\"p[BdW\r^5p]\u0002")
/* loaded from: input_file:org/apache/spark/shuffle/ShuffleBlockPusherSuite.class */
public class ShuffleBlockPusherSuite extends SparkFunSuite {

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    private BlockManager blockManager;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    private ShuffleDependency<Object, Object, Object> dependency;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    private BlockStoreClient shuffleClient;

    @Mock(answer = Answers.RETURNS_SMART_NULLS)
    private CoarseGrainedExecutorBackend executorBackend;
    private SparkConf conf;
    private ArrayBuffer<String> pushedBlocks = new ArrayBuffer<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: ShuffleBlockPusherSuite.scala */
    /* loaded from: input_file:org/apache/spark/shuffle/ShuffleBlockPusherSuite$ConcurrentTestBlockPusher.class */
    public class ConcurrentTestBlockPusher extends TestShuffleBlockPusher {
        private final Semaphore semaphore;
        private final ThreadPoolExecutor blockPusher;

        public ThreadPoolExecutor blockPusher() {
            return this.blockPusher;
        }

        @Override // org.apache.spark.shuffle.ShuffleBlockPusherSuite.TestShuffleBlockPusher
        public void submitTask(Runnable runnable) {
            blockPusher().execute(runnable);
        }

        public void notifyDriverAboutPushCompletion() {
            super.notifyDriverAboutPushCompletion();
            this.semaphore.release();
        }

        public /* synthetic */ ShuffleBlockPusherSuite org$apache$spark$shuffle$ShuffleBlockPusherSuite$ConcurrentTestBlockPusher$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public ConcurrentTestBlockPusher(ShuffleBlockPusherSuite shuffleBlockPusherSuite, SparkConf sparkConf, Semaphore semaphore) {
            super(shuffleBlockPusherSuite, sparkConf);
            this.semaphore = semaphore;
            this.blockPusher = ThreadUtils$.MODULE$.newDaemonFixedThreadPool(1, "test-block-pusher");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: ShuffleBlockPusherSuite.scala */
    /* loaded from: input_file:org/apache/spark/shuffle/ShuffleBlockPusherSuite$TestShuffleBlockPusher.class */
    public class TestShuffleBlockPusher extends ShuffleBlockPusher {
        private final LinkedBlockingQueue<Runnable> tasks;
        public final /* synthetic */ ShuffleBlockPusherSuite $outer;

        public LinkedBlockingQueue<Runnable> tasks() {
            return this.tasks;
        }

        public void submitTask(Runnable runnable) {
            tasks().add(runnable);
        }

        public void runPendingTasks() {
            while (!tasks().isEmpty()) {
                tasks().take().run();
            }
        }

        public ManagedBuffer createRequestBuffer(TransportConf transportConf, File file, long j, long j2) {
            ManagedBuffer managedBuffer = (ManagedBuffer) Mockito.mock(ManagedBuffer.class);
            Mockito.when(managedBuffer.nioByteBuffer()).thenReturn(ByteBuffer.wrap(new byte[(int) j2]));
            return managedBuffer;
        }

        public /* synthetic */ ShuffleBlockPusherSuite org$apache$spark$shuffle$ShuffleBlockPusherSuite$TestShuffleBlockPusher$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public TestShuffleBlockPusher(ShuffleBlockPusherSuite shuffleBlockPusherSuite, SparkConf sparkConf) {
            super(sparkConf);
            if (shuffleBlockPusherSuite == null) {
                throw null;
            }
            this.$outer = shuffleBlockPusherSuite;
            this.tasks = new LinkedBlockingQueue<>();
        }
    }

    private BlockManager blockManager() {
        return this.blockManager;
    }

    private void blockManager_$eq(BlockManager blockManager) {
        this.blockManager = blockManager;
    }

    private ShuffleDependency<Object, Object, Object> dependency() {
        return this.dependency;
    }

    private void dependency_$eq(ShuffleDependency<Object, Object, Object> shuffleDependency) {
        this.dependency = shuffleDependency;
    }

    private BlockStoreClient shuffleClient() {
        return this.shuffleClient;
    }

    private void shuffleClient_$eq(BlockStoreClient blockStoreClient) {
        this.shuffleClient = blockStoreClient;
    }

    private CoarseGrainedExecutorBackend executorBackend() {
        return this.executorBackend;
    }

    private void executorBackend_$eq(CoarseGrainedExecutorBackend coarseGrainedExecutorBackend) {
        this.executorBackend = coarseGrainedExecutorBackend;
    }

    private SparkConf conf() {
        return this.conf;
    }

    private void conf_$eq(SparkConf sparkConf) {
        this.conf = sparkConf;
    }

    private ArrayBuffer<String> pushedBlocks() {
        return this.pushedBlocks;
    }

    private void pushedBlocks_$eq(ArrayBuffer<String> arrayBuffer) {
        this.pushedBlocks = arrayBuffer;
    }

    @Override // org.apache.spark.SparkFunSuite
    public void beforeEach() {
        BeforeAndAfterEach.beforeEach$(this);
        conf_$eq(new SparkConf(false));
        MockitoAnnotations.openMocks(this).close();
        Mockito.when(BoxesRunTime.boxToInteger(dependency().shuffleId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(dependency().partitioner()).thenReturn(new HashPartitioner(8));
        Mockito.when(dependency().serializer()).thenReturn(new JavaSerializer(conf()));
        Mockito.when(dependency().getMergerLocs()).thenReturn(new $colon.colon(BlockManagerId$.MODULE$.apply("test-client", "test-client", 1, BlockManagerId$.MODULE$.apply$default$4()), Nil$.MODULE$));
        SparkEnv sparkEnv = (SparkEnv) Mockito.mock(SparkEnv.class);
        Mockito.when(sparkEnv.conf()).thenReturn(conf());
        Mockito.when(sparkEnv.blockManager()).thenReturn(blockManager());
        SparkEnv$.MODULE$.set(sparkEnv);
        Mockito.when(SparkEnv$.MODULE$.get().executorBackend()).thenReturn(new Some(executorBackend()));
        Mockito.when(blockManager().blockStoreClient()).thenReturn(shuffleClient());
    }

    @Override // org.apache.spark.SparkFunSuite, org.apache.spark.LocalSparkContext
    public void afterEach() {
        pushedBlocks().clear();
        BeforeAndAfterEach.afterEach$(this);
    }

    private void interceptPushedBlocksForSuccess() {
        shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$interceptPushedBlocksForSuccess$1(this, invocationOnMock);
            return BoxedUnit.UNIT;
        });
    }

    private void verifyPushRequests(Seq<ShuffleBlockPusher.PushRequest> seq, Seq<Object> seq2) {
        Tuple2Zipped$.MODULE$.foreach$extension(Tuple2Zipped$Ops$.MODULE$.zipped$extension(Predef$.MODULE$.tuple2ToZippedOps(new Tuple2(seq, seq2)), Predef$.MODULE$.$conforms(), Predef$.MODULE$.$conforms()), (pushRequest, obj) -> {
            return $anonfun$verifyPushRequests$1(pushRequest, BoxesRunTime.unboxToInt(obj));
        });
    }

    private void verifyBlockPushCompleted(ShuffleBlockPusher shuffleBlockPusher) {
        ((CoarseGrainedExecutorBackend) Mockito.verify(executorBackend(), Mockito.times(1))).notifyDriverAboutPushCompletion(dependency().shuffleId(), 0, 0);
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(shuffleBlockPusher.isPushCompletionNotified(), "blockPusher.isPushCompletionNotified", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
    }

    public static final /* synthetic */ Object[] $anonfun$interceptPushedBlocksForSuccess$2(String[] strArr) {
        return Predef$.MODULE$.refArrayOps(strArr);
    }

    public static final /* synthetic */ Object[] $anonfun$interceptPushedBlocksForSuccess$3(ManagedBuffer[] managedBufferArr) {
        return Predef$.MODULE$.refArrayOps(managedBufferArr);
    }

    public static final /* synthetic */ void $anonfun$interceptPushedBlocksForSuccess$1(ShuffleBlockPusherSuite shuffleBlockPusherSuite, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        shuffleBlockPusherSuite.pushedBlocks().$plus$plus$eq(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)));
        ManagedBuffer[] managedBufferArr = (ManagedBuffer[]) invocationOnMock.getArguments()[3];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        Tuple2Zipped$.MODULE$.foreach$extension(Tuple2Zipped$Ops$.MODULE$.zipped$extension(Predef$.MODULE$.tuple2ToZippedOps(new Tuple2(strArr, managedBufferArr)), strArr2 -> {
            return new ArrayOps.ofRef($anonfun$interceptPushedBlocksForSuccess$2(strArr2));
        }, managedBufferArr2 -> {
            return new ArrayOps.ofRef($anonfun$interceptPushedBlocksForSuccess$3(managedBufferArr2));
        }), (str, managedBuffer) -> {
            blockPushingListener.onBlockPushSuccess(str, managedBuffer);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ Assertion $anonfun$verifyPushRequests$1(ShuffleBlockPusher.PushRequest pushRequest, int i) {
        return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushRequest, "size", BoxesRunTime.boxToInteger(pushRequest.size()), BoxesRunTime.boxToInteger(i), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 96));
    }

    public static final /* synthetic */ void $anonfun$new$12(BlockPushingListener blockPushingListener, String str) {
        blockPushingListener.onBlockPushSuccess(str, (ManagedBuffer) Mockito.mock(ManagedBuffer.class));
    }

    public static final /* synthetic */ void $anonfun$new$11(CountDownLatch countDownLatch, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        countDownLatch.await();
        Thread.sleep(500L);
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$12(blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$14(BlockPushingListener blockPushingListener, String str) {
        blockPushingListener.onBlockPushSuccess(str, (ManagedBuffer) Mockito.mock(ManagedBuffer.class));
    }

    public static final /* synthetic */ void $anonfun$new$13(CountDownLatch countDownLatch, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        countDownLatch.await();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$14(blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ Object[] $anonfun$new$24(String[] strArr) {
        return Predef$.MODULE$.refArrayOps(strArr);
    }

    public static final /* synthetic */ Object[] $anonfun$new$25(ManagedBuffer[] managedBufferArr) {
        return Predef$.MODULE$.refArrayOps(managedBufferArr);
    }

    public static final /* synthetic */ void $anonfun$new$34(ShuffleBlockPusherSuite shuffleBlockPusherSuite, BooleanRef booleanRef, BlockPushingListener blockPushingListener, String str) {
        if (booleanRef.elem) {
            booleanRef.elem = false;
            blockPushingListener.onBlockPushFailure(str, new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""));
        } else {
            shuffleBlockPusherSuite.pushedBlocks().$plus$eq(str);
            blockPushingListener.onBlockPushSuccess(str, (ManagedBuffer) Mockito.mock(ManagedBuffer.class));
        }
    }

    public static final /* synthetic */ void $anonfun$new$33(ShuffleBlockPusherSuite shuffleBlockPusherSuite, BooleanRef booleanRef, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$34(shuffleBlockPusherSuite, booleanRef, blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$38(ShuffleBlockPusherSuite shuffleBlockPusherSuite, BooleanRef booleanRef, BlockPushingListener blockPushingListener, String str) {
        if (booleanRef.elem) {
            booleanRef.elem = false;
            blockPushingListener.onBlockPushFailure(str, new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, ""));
        } else {
            shuffleBlockPusherSuite.pushedBlocks().$plus$eq(str);
            blockPushingListener.onBlockPushSuccess(str, (ManagedBuffer) Mockito.mock(ManagedBuffer.class));
        }
    }

    public static final /* synthetic */ void $anonfun$new$37(ShuffleBlockPusherSuite shuffleBlockPusherSuite, BooleanRef booleanRef, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$38(shuffleBlockPusherSuite, booleanRef, blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$42(BlockPushingListener blockPushingListener, String str) {
        blockPushingListener.onBlockPushFailure(str, new RuntimeException(new ConnectException()));
    }

    public static final /* synthetic */ void $anonfun$new$41(ShuffleBlockPusherSuite shuffleBlockPusherSuite, InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        shuffleBlockPusherSuite.pushedBlocks().$plus$plus$eq(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)));
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$42(blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$46(BlockPushingListener blockPushingListener, String str) {
        blockPushingListener.onBlockPushFailure(str, new IOException("Failed to send RPC", new FileNotFoundException("file not found")));
    }

    public static final /* synthetic */ void $anonfun$new$45(InvocationOnMock invocationOnMock) {
        String[] strArr = (String[]) invocationOnMock.getArguments()[2];
        BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)).foreach(str -> {
            $anonfun$new$46(blockPushingListener, str);
            return BoxedUnit.UNIT;
        });
    }

    public ShuffleBlockPusherSuite() {
        test("A batch of blocks is limited by maxBlocksBatchSize", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.interceptPushedBlocksForSuccess();
            this.conf().set("spark.shuffle.push.maxBlockBatchSize", "1m");
            this.conf().set("spark.shuffle.push.maxBlockSizeToPush", "2048k");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            Seq seq = (Seq) this.dependency().getMergerLocs().map(blockManagerId -> {
                return BlockManagerId$.MODULE$.apply("", blockManagerId.host(), blockManagerId.port(), BlockManagerId$.MODULE$.apply$default$4());
            }, Seq$.MODULE$.canBuildFrom());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 5L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            Seq<ShuffleBlockPusher.PushRequest> prepareBlockPushRequests = testShuffleBlockPusher.prepareBlockPushRequests(5, 0, 0, 0, (File) Mockito.mock(File.class), new long[]{2, 2, 2, 2097152, 2097152}, seq, (TransportConf) Mockito.mock(TransportConf.class));
            testShuffleBlockPusher.runPendingTasks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(prepareBlockPushRequests, "length", BoxesRunTime.boxToInteger(prepareBlockPushRequests.length()), BoxesRunTime.boxToInteger(3), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 120));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            this.verifyPushRequests(prepareBlockPushRequests, Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{6, 2097152, 2097152})));
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 107));
        test("Large blocks are excluded in the preparation", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.interceptPushedBlocksForSuccess();
            this.conf().set("spark.shuffle.push.maxBlockSizeToPush", "1k");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            Seq seq = (Seq) this.dependency().getMergerLocs().map(blockManagerId -> {
                return BlockManagerId$.MODULE$.apply("", blockManagerId.host(), blockManagerId.port(), BlockManagerId$.MODULE$.apply$default$4());
            }, Seq$.MODULE$.canBuildFrom());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 5L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            Seq<ShuffleBlockPusher.PushRequest> prepareBlockPushRequests = testShuffleBlockPusher.prepareBlockPushRequests(5, 0, 0, 0, (File) Mockito.mock(File.class), new long[]{2, 2, 2, 1028, 1024}, seq, (TransportConf) Mockito.mock(TransportConf.class));
            testShuffleBlockPusher.runPendingTasks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(prepareBlockPushRequests, "length", BoxesRunTime.boxToInteger(prepareBlockPushRequests.length()), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 135));
            this.verifyPushRequests(prepareBlockPushRequests, Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{6, 1024})));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 125));
        test("Number of blocks in a push request are limited by maxBlocksInFlightPerAddress ", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.interceptPushedBlocksForSuccess();
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "1");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            Seq seq = (Seq) this.dependency().getMergerLocs().map(blockManagerId -> {
                return BlockManagerId$.MODULE$.apply("", blockManagerId.host(), blockManagerId.port(), BlockManagerId$.MODULE$.apply$default$4());
            }, Seq$.MODULE$.canBuildFrom());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 5L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            Seq<ShuffleBlockPusher.PushRequest> prepareBlockPushRequests = testShuffleBlockPusher.prepareBlockPushRequests(5, 0, 0, 0, (File) Mockito.mock(File.class), new long[]{2, 2, 2, 2, 2}, seq, (TransportConf) Mockito.mock(TransportConf.class));
            testShuffleBlockPusher.runPendingTasks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(prepareBlockPushRequests, "length", BoxesRunTime.boxToInteger(prepareBlockPushRequests.length()), BoxesRunTime.boxToInteger(5), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 150));
            this.verifyPushRequests(prepareBlockPushRequests, Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{2, 2, 2, 2, 2})));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 140));
        test("SPARK-33701: Ensure all the blocks are pushed before notifying driver about push completion", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set(package$.MODULE$.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS(), BoxesRunTime.boxToInteger(12));
            this.conf().set("spark.shuffle.push.maxBlockBatchSize", "20b");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Mockito.when(this.dependency().getMergerLocs()).thenReturn(new $colon.colon(BlockManagerId$.MODULE$.apply("test-client", "test-client", 1, BlockManagerId$.MODULE$.apply$default$4()), new $colon.colon(BlockManagerId$.MODULE$.apply("slow-client", "slow-client", 1, BlockManagerId$.MODULE$.apply$default$4()), Nil$.MODULE$)));
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.eq("slow-client"), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                $anonfun$new$11(countDownLatch, invocationOnMock);
                return BoxedUnit.UNIT;
            });
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.eq("test-client"), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock2 -> {
                $anonfun$new$13(countDownLatch, invocationOnMock2);
                return BoxedUnit.UNIT;
            });
            Semaphore semaphore = new Semaphore(0);
            ConcurrentTestBlockPusher concurrentTestBlockPusher = new ConcurrentTestBlockPusher(this, this.conf(), semaphore);
            Seq seq = (Seq) this.dependency().getMergerLocs().map(blockManagerId -> {
                return BlockManagerId$.MODULE$.apply("", blockManagerId.host(), blockManagerId.port(), BlockManagerId$.MODULE$.apply$default$4());
            }, Seq$.MODULE$.canBuildFrom());
            concurrentTestBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 5L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            Seq<ShuffleBlockPusher.PushRequest> prepareBlockPushRequests = concurrentTestBlockPusher.prepareBlockPushRequests(5, 0, 0, 0, (File) Mockito.mock(File.class), new long[]{2, 2, 2, 2, 2}, seq, (TransportConf) Mockito.mock(TransportConf.class));
            countDownLatch.countDown();
            countDownLatch.countDown();
            semaphore.acquire();
            long bytesInFlight = concurrentTestBlockPusher.bytesInFlight();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(bytesInFlight), "<=", BoxesRunTime.boxToInteger(0), bytesInFlight <= ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 194));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(prepareBlockPushRequests, "length", BoxesRunTime.boxToInteger(prepareBlockPushRequests.length()), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 195));
            this.verifyPushRequests(prepareBlockPushRequests, Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{6, 4})));
            this.verifyBlockPushCompleted(concurrentTestBlockPusher);
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 156));
        test("Basic block push", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.interceptPushedBlocksForSuccess();
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(1))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(this.dependency().partitioner().numPartitions()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 208));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 200));
        test("Large blocks are skipped for push", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.shuffle.push.maxBlockSizeToPush", "1k");
            this.interceptPushedBlocksForSuccess();
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), new long[]{2, 2, 2, 2, 2, 2, 2, 1100}, this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(1))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(this.dependency().partitioner().numPartitions() - 1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 222));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 213));
        test("Number of blocks in flight per address are limited by maxBlocksInFlightPerAddress", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "1");
            this.interceptPushedBlocksForSuccess();
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(8))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(this.dependency().partitioner().numPartitions()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 236));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 227));
        test("Hit maxBlocksInFlightPerAddress limit so that the blocks are deferred", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "2");
            ObjectRef create = ObjectRef.create((Object) null);
            ObjectRef create2 = ObjectRef.create((Object) null);
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                String[] strArr = (String[]) invocationOnMock.getArguments()[2];
                this.pushedBlocks().$plus$plus$eq(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(strArr)));
                ManagedBuffer[] managedBufferArr = (ManagedBuffer[]) invocationOnMock.getArguments()[3];
                BlockPushingListener blockPushingListener = (BlockPushingListener) invocationOnMock.getArguments()[4];
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(strArr, "length", BoxesRunTime.boxToInteger(strArr.length), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 252));
                if (((String) create.elem) != null) {
                    Tuple2Zipped$.MODULE$.foreach$extension(Tuple2Zipped$Ops$.MODULE$.zipped$extension(Predef$.MODULE$.tuple2ToZippedOps(new Tuple2(strArr, managedBufferArr)), strArr2 -> {
                        return new ArrayOps.ofRef($anonfun$new$24(strArr2));
                    }, managedBufferArr2 -> {
                        return new ArrayOps.ofRef($anonfun$new$25(managedBufferArr2));
                    }), (str, managedBuffer) -> {
                        blockPushingListener.onBlockPushSuccess(str, managedBuffer);
                        return BoxedUnit.UNIT;
                    });
                    return BoxedUnit.UNIT;
                }
                create.elem = strArr[1];
                create2.elem = blockPushingListener;
                blockPushingListener.onBlockPushSuccess(strArr[0], managedBufferArr[0]);
                return BoxedUnit.UNIT;
            });
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(1))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 271));
            ((BlockPushingListener) create2.elem).onBlockPushSuccess((String) create.elem, (ManagedBuffer) Mockito.mock(ManagedBuffer.class));
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(4))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks2 = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks2, "length", BoxesRunTime.boxToInteger(pushedBlocks2.length()), BoxesRunTime.boxToInteger(8), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 277));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 241));
        test("Number of shuffle blocks grouped in a single push request is limited by maxBlockBatchSize", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.shuffle.push.maxBlockBatchSize", "1m");
            this.interceptPushedBlocksForSuccess();
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 524288L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(4))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(this.dependency().partitioner().numPartitions()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 292));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 283));
        test("Error retries", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            ErrorHandler.BlockPushErrorHandler createErrorHandler = new ShuffleBlockPusher(this.conf()).createErrorHandler();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")), "errorHandler.shouldRetryError(new org.apache.spark.network.server.BlockPushNonFatalFailure(TOO_LATE_BLOCK_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 300));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")), "errorHandler.shouldRetryError(new org.apache.spark.network.server.BlockPushNonFatalFailure(TOO_OLD_ATTEMPT_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 303));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")), "errorHandler.shouldRetryError(new org.apache.spark.network.server.BlockPushNonFatalFailure(STALE_BLOCK_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 306));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new RuntimeException(new ConnectException())), "errorHandler.shouldRetryError(new scala.`package`.RuntimeException(new java.net.ConnectException()))", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 309));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")), "errorHandler.shouldRetryError(new org.apache.spark.network.server.BlockPushNonFatalFailure(BLOCK_APPEND_COLLISION_DETECTED, \"\"))", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 310));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldRetryError(new Throwable()), "errorHandler.shouldRetryError(new scala.`package`.Throwable())", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 313));
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 297));
        test("Error logging", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            ErrorHandler.BlockPushErrorHandler createErrorHandler = new ShuffleBlockPusher(this.conf()).createErrorHandler();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")), "errorHandler.shouldLogError(new org.apache.spark.network.server.BlockPushNonFatalFailure(TOO_LATE_BLOCK_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 319));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")), "errorHandler.shouldLogError(new org.apache.spark.network.server.BlockPushNonFatalFailure(TOO_OLD_ATTEMPT_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 322));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")), "errorHandler.shouldLogError(new org.apache.spark.network.server.BlockPushNonFatalFailure(STALE_BLOCK_PUSH, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 325));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldLogError(new BlockPushNonFatalFailure(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")), "errorHandler.shouldLogError(new org.apache.spark.network.server.BlockPushNonFatalFailure(BLOCK_APPEND_COLLISION_DETECTED, \"\"))", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 328));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(createErrorHandler.shouldLogError(new Throwable()), "errorHandler.shouldLogError(new scala.`package`.Throwable())", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 330));
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 316));
        test("Blocks are continued to push even when a block push fails with collision exception", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "1");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            BooleanRef create = BooleanRef.create(true);
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                $anonfun$new$33(this, create, invocationOnMock);
                return BoxedUnit.UNIT;
            });
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(8))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(7), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 359));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 334));
        test("More blocks are not pushed when a block push fails with too late exception", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "1");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            BooleanRef create = BooleanRef.create(true);
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                $anonfun$new$37(this, create, invocationOnMock);
                return BoxedUnit.UNIT;
            });
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(1))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(pushedBlocks, "isEmpty", pushedBlocks.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 389));
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 364));
        test("Connect exceptions remove all the push requests for that host", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            Mockito.when(this.dependency().getMergerLocs()).thenReturn(new $colon.colon(BlockManagerId$.MODULE$.apply("client1", "client1", 1, BlockManagerId$.MODULE$.apply$default$4()), new $colon.colon(BlockManagerId$.MODULE$.apply("client2", "client2", 2, BlockManagerId$.MODULE$.apply$default$4()), Nil$.MODULE$)));
            this.conf().set("spark.reducer.maxBlocksInFlightPerAddress", "2");
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                $anonfun$new$41(this, invocationOnMock);
                return BoxedUnit.UNIT;
            });
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(2))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            ArrayBuffer<String> pushedBlocks = this.pushedBlocks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(pushedBlocks, "length", BoxesRunTime.boxToInteger(pushedBlocks.length()), BoxesRunTime.boxToInteger(4), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 413));
            HashSet unreachableBlockMgrs = testShuffleBlockPusher.unreachableBlockMgrs();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.lengthSizeMacroBool(unreachableBlockMgrs, "size", BoxesRunTime.boxToInteger(unreachableBlockMgrs.size()), BoxesRunTime.boxToInteger(2), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 414));
            this.verifyBlockPushCompleted(testShuffleBlockPusher);
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 392));
        test("SPARK-36255: FileNotFoundException stops the push", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            Mockito.when(this.dependency().getMergerLocs()).thenReturn(new $colon.colon(BlockManagerId$.MODULE$.apply("client1", "client1", 1, BlockManagerId$.MODULE$.apply$default$4()), new $colon.colon(BlockManagerId$.MODULE$.apply("client2", "client2", 2, BlockManagerId$.MODULE$.apply$default$4()), Nil$.MODULE$)));
            this.conf().set("spark.reducer.maxReqsInFlight", "1");
            TestShuffleBlockPusher testShuffleBlockPusher = new TestShuffleBlockPusher(this, this.conf());
            this.shuffleClient().pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
                $anonfun$new$45(invocationOnMock);
                return BoxedUnit.UNIT;
            });
            testShuffleBlockPusher.initiateBlockPush((File) Mockito.mock(File.class), (long[]) Array$.MODULE$.fill(this.dependency().partitioner().numPartitions(), () -> {
                return 2L;
            }, ClassTag$.MODULE$.Long()), this.dependency(), 0);
            testShuffleBlockPusher.runPendingTasks();
            ((BlockStoreClient) Mockito.verify(this.shuffleClient(), Mockito.times(1))).pushBlocks((String) ArgumentMatchers.any(), BoxesRunTime.unboxToInt(ArgumentMatchers.any()), (String[]) ArgumentMatchers.any(), (ManagedBuffer[]) ArgumentMatchers.any(), (BlockPushingListener) ArgumentMatchers.any());
            LinkedBlockingQueue<Runnable> tasks = testShuffleBlockPusher.tasks();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(tasks, "isEmpty", tasks.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 438));
            ShuffleBlockPusher$.MODULE$.stop();
        }, new Position("ShuffleBlockPusherSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 418));
    }
}
