package org.apache.hadoop.mapreduce.task.reduce;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathId;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* JADX WARN: Classes with same name are omitted:
  input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher.class
 */
/* loaded from: input_file:hadoop-mapreduce-client-contrib-2.4.1-mapr-1408-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher.class */
public class TestDirectShuffleFetcher {
    private static final Log LOG = LogFactory.getLog(TestDirectShuffleFetcher.class);
    PathId pathId;
    FileSystem rfs;
    JobConf job = null;
    TaskAttemptID id = null;
    DirectShuffleSchedulerImpl<Text, Text> ss = null;
    DirectShuffleMergeManagerImpl<Text, Text> mm = null;
    Reporter r = null;
    ShuffleClientMetrics metrics = null;
    ExceptionReporter except = null;
    Counters.Counter allErrs = null;
    final String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
    final MapHost host = new MapHost("localhost", "http://localhost:8080/");
    final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
    final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");

    @Rule
    public TestName name = new TestName();

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$FakeDirectShuffleFetcher.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.4.1-mapr-1408-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$FakeDirectShuffleFetcher.class */
    public static class FakeDirectShuffleFetcher<K, V> extends DirectShuffleFetcher<K, V> {
        public FakeDirectShuffleFetcher(int i, JobConf jobConf, TaskAttemptID taskAttemptID, DirectShuffleSchedulerImpl<K, V> directShuffleSchedulerImpl, DirectShuffleMergeManagerImpl<K, V> directShuffleMergeManagerImpl, Reporter reporter, ShuffleClientMetrics shuffleClientMetrics, ExceptionReporter exceptionReporter, MapOutputFile mapOutputFile, FileSystem fileSystem) {
            super(i, jobConf, taskAttemptID, directShuffleSchedulerImpl, directShuffleMergeManagerImpl, reporter, shuffleClientMetrics, exceptionReporter, mapOutputFile);
            this.rfs = fileSystem;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$FakeFSDataInputStream.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.4.1-mapr-1408-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$FakeFSDataInputStream.class */
    public static class FakeFSDataInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
        public FakeFSDataInputStream(byte[] bArr) {
            super(bArr);
        }

        public void seek(long j) throws IOException {
            this.pos = (int) j;
        }

        public long getPos() throws IOException {
            return this.pos;
        }

        public boolean seekToNewSource(long j) throws IOException {
            return false;
        }

        public int read(long j, byte[] bArr, int i, int i2) throws IOException {
            return 0;
        }

        public void readFully(long j, byte[] bArr, int i, int i2) throws IOException {
        }

        public void readFully(long j, byte[] bArr) throws IOException {
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$StuckInputStream.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.4.1-mapr-1408-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleFetcher$StuckInputStream.class */
    static class StuckInputStream extends FilterInputStream implements Seekable, PositionedReadable {
        boolean stuck;
        volatile boolean closed;
        int length;

        StuckInputStream(byte[] bArr) {
            super(new ByteArrayInputStream(bArr));
            this.stuck = false;
            this.closed = false;
            this.length = bArr.length;
        }

        int freeze() throws IOException {
            synchronized (this) {
                this.stuck = true;
                notify();
            }
            do {
                if (Thread.currentThread().isInterrupted() && !this.closed) {
                    return 0;
                }
            } while (!this.closed);
            throw new IOException("underlying stream closed, triggered an error");
        }

        @Override // java.io.FilterInputStream, java.io.InputStream
        public int read(byte[] bArr) throws IOException {
            int read = super.read(bArr);
            return read != -1 ? read : freeze();
        }

        @Override // java.io.FilterInputStream, java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            int read = super.read(bArr, i, i2);
            return read != -1 ? read : freeze();
        }

        @Override // java.io.FilterInputStream, java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.closed = true;
        }

        public synchronized void waitForFetcher() throws InterruptedException {
            while (!this.stuck) {
                wait();
            }
        }

        public boolean wasClosedProperly() {
            return this.closed;
        }

        public int read(long j, byte[] bArr, int i, int i2) throws IOException {
            int read = super.read(bArr, i, i2);
            return read != -1 ? read : freeze();
        }

        public void readFully(long j, byte[] bArr, int i, int i2) throws IOException {
            if (super.read(bArr, i, i2) != -1) {
                return;
            }
            freeze();
        }

        public void readFully(long j, byte[] bArr) throws IOException {
            if (super.read(bArr) != -1) {
                return;
            }
            freeze();
        }

        public void seek(long j) throws IOException {
            skip(j);
        }

        public long getPos() throws IOException {
            return Math.abs(available() - this.length);
        }

        public boolean seekToNewSource(long j) throws IOException {
            return false;
        }
    }

    @Before
    public void setup() {
        LOG.info(">>>> " + this.name.getMethodName());
        this.job = new JobConf();
        this.id = TaskAttemptID.forName("attempt_0_1_r_1_1");
        this.ss = (DirectShuffleSchedulerImpl) Mockito.mock(DirectShuffleSchedulerImpl.class);
        this.mm = (DirectShuffleMergeManagerImpl) Mockito.mock(DirectShuffleMergeManagerImpl.class);
        this.r = (Reporter) Mockito.mock(Reporter.class);
        this.metrics = (ShuffleClientMetrics) Mockito.mock(ShuffleClientMetrics.class);
        this.except = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        this.rfs = (FileSystem) Mockito.mock(FileSystem.class);
        this.allErrs = (Counters.Counter) Mockito.mock(Counters.Counter.class);
        Mockito.when(this.r.getCounter(Mockito.anyString(), Mockito.anyString())).thenReturn(this.allErrs);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(this.map1ID);
        arrayList.add(this.map2ID);
        this.pathId = (PathId) Mockito.mock(PathId.class);
        Mockito.when(this.pathId.getFid()).thenReturn("anypath");
        try {
            Mockito.when(this.ss.getLocation()).thenReturn(new MapOutputLocation(this.map1ID, "localhost", this.pathId));
        } catch (InterruptedException e) {
            Assert.fail();
        }
    }

    @After
    public void teardown() {
        LOG.info("<<<< " + this.name.getMethodName());
    }

    @Test
    public void testReduceOutOfDiskSpace() throws Throwable {
        LOG.info("testReduceOutOfDiskSpace");
        FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(1, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, (MapOutputFile) Mockito.mock(MapRFsOutputFile.class), this.rfs);
        long length = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long length2 = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(128);
        byteArrayOutputStream.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] bArr = {(byte) (length >>> 56), (byte) (length >>> 48), (byte) (length >>> 40), (byte) (length >>> 32), (byte) (length >>> 24), (byte) (length >>> 16), (byte) (length >>> 8), (byte) (length >>> 0)};
        byteArrayOutputStream.write(bArr);
        bArr[0] = (byte) (length2 >>> 56);
        bArr[1] = (byte) (length2 >>> 48);
        bArr[2] = (byte) (length2 >>> 40);
        bArr[3] = (byte) (length2 >>> 32);
        bArr[4] = (byte) (length2 >>> 24);
        bArr[5] = (byte) (length2 >>> 16);
        bArr[6] = (byte) (length2 >>> 8);
        bArr[7] = (byte) (length2 >>> 0);
        byteArrayOutputStream.write(bArr);
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        try {
            Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn(new FSDataInputStream(new FakeFSDataInputStream(byteArrayOutputStream.toByteArray())) { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.1
                public long getFileLength() {
                    return byteArray.length;
                }
            });
        } catch (IOException e) {
            Assert.fail();
        }
        Mockito.when(this.mm.reserve((TaskAttemptID) Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenThrow(new Throwable[]{new IOException("No disk space available")});
        fakeDirectShuffleFetcher.copyOutput(this.ss.getLocation());
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).reportLocalError((IOException) Mockito.any(IOException.class));
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
    }

    @Test
    public void testCopyFailed() throws Exception {
        LOG.info("testCopyFailed");
        FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(1, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, (MapOutputFile) Mockito.mock(MapRFsOutputFile.class), this.rfs);
        Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), (String) Mockito.any(String.class), Mockito.anyInt())).thenThrow(new Throwable[]{new FileNotFoundException()});
        MapOutputLocation location = this.ss.getLocation();
        fakeDirectShuffleFetcher.copyOutput(location);
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).copyFailed(this.map1ID, location);
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
        ((ExceptionReporter) Mockito.verify(this.except, Mockito.never())).reportException((Throwable) Mockito.any(IOException.class));
    }

    @Test(timeout = 10000)
    public void testCopyFromHostWait() throws Exception {
        long length = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long length2 = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(128);
        byteArrayOutputStream.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] bArr = {(byte) (length >>> 56), (byte) (length >>> 48), (byte) (length >>> 40), (byte) (length >>> 32), (byte) (length >>> 24), (byte) (length >>> 16), (byte) (length >>> 8), (byte) (length >>> 0)};
        byteArrayOutputStream.write(bArr);
        bArr[0] = (byte) (length2 >>> 56);
        bArr[1] = (byte) (length2 >>> 48);
        bArr[2] = (byte) (length2 >>> 40);
        bArr[3] = (byte) (length2 >>> 32);
        bArr[4] = (byte) (length2 >>> 24);
        bArr[5] = (byte) (length2 >>> 16);
        bArr[6] = (byte) (length2 >>> 8);
        bArr[7] = (byte) (length2 >>> 0);
        byteArrayOutputStream.write(bArr);
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        try {
            Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn(new FSDataInputStream(new FakeFSDataInputStream(byteArrayOutputStream.toByteArray())) { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.2
                public long getFileLength() {
                    return byteArray.length;
                }
            });
        } catch (IOException e) {
            Assert.fail();
        }
        final DirectInMemoryOutput directInMemoryOutput = (DirectInMemoryOutput) Mockito.spy(new DirectInMemoryOutput(this.job, this.id, this.mm, 100, null, true));
        Mockito.when(this.mm.unconditionalReserve(this.map1ID, 2000000L, true)).thenCallRealMethod();
        Mockito.when(this.mm.reserve((TaskAttemptID) Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenCallRealMethod();
        Mockito.when(Boolean.valueOf(this.mm.canShuffleToMemory(Mockito.anyLong()))).thenReturn(true);
        ((DirectShuffleMergeManagerImpl) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m16answer(InvocationOnMock invocationOnMock) throws Throwable {
                invocationOnMock.callRealMethod();
                return null;
            }
        }).when(this.mm)).unreserve(2000000L);
        final FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(0, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, (MapOutputFile) Mockito.mock(MapRFsOutputFile.class), this.rfs);
        this.mm.unconditionalReserve(this.map1ID, 2000000L, true);
        final MapOutputLocation location = this.ss.getLocation();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Mockito.when(TestDirectShuffleFetcher.this.mm.unconditionalReserve(TestDirectShuffleFetcher.this.map1ID, 36L, true)).thenReturn(directInMemoryOutput);
                    fakeDirectShuffleFetcher.copyOutput(location);
                } catch (IOException e2) {
                    Assert.fail();
                } catch (InterruptedException e3) {
                    Assert.fail();
                }
            }
        });
        thread.start();
        Thread.sleep(1000L);
        this.mm.unreserve(2000000L);
        thread.join();
        ((Counters.Counter) Mockito.verify(this.allErrs, Mockito.times(1))).increment(1L);
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss, Mockito.times(1))).copyFailed(this.map1ID, location);
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
    }

    @Test(timeout = 10000)
    public void testCopyFromHostCompressFailure() throws Exception {
        long length = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long length2 = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(128);
        byteArrayOutputStream.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] bArr = {(byte) (length >>> 56), (byte) (length >>> 48), (byte) (length >>> 40), (byte) (length >>> 32), (byte) (length >>> 24), (byte) (length >>> 16), (byte) (length >>> 8), (byte) (length >>> 0)};
        byteArrayOutputStream.write(bArr);
        bArr[0] = (byte) (length2 >>> 56);
        bArr[1] = (byte) (length2 >>> 48);
        bArr[2] = (byte) (length2 >>> 40);
        bArr[3] = (byte) (length2 >>> 32);
        bArr[4] = (byte) (length2 >>> 24);
        bArr[5] = (byte) (length2 >>> 16);
        bArr[6] = (byte) (length2 >>> 8);
        bArr[7] = (byte) (length2 >>> 0);
        byteArrayOutputStream.write(bArr);
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        try {
            Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn(new FSDataInputStream(new FakeFSDataInputStream(byteArrayOutputStream.toByteArray())) { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.5
                public long getFileLength() {
                    return byteArray.length;
                }
            });
        } catch (IOException e) {
            Assert.fail();
        }
        DirectInMemoryOutput directInMemoryOutput = (DirectInMemoryOutput) Mockito.mock(DirectInMemoryOutput.class);
        FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(0, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, (MapOutputFile) Mockito.mock(MapRFsOutputFile.class), this.rfs);
        Mockito.when(this.mm.reserve((TaskAttemptID) Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn(directInMemoryOutput);
        ((DirectInMemoryOutput) Mockito.doThrow(new InternalError()).when(directInMemoryOutput)).shuffle((MapHost) Mockito.any(MapHost.class), (InputStream) Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics) Mockito.any(ShuffleClientMetrics.class), (Reporter) Mockito.any(Reporter.class));
        MapOutputLocation location = this.ss.getLocation();
        fakeDirectShuffleFetcher.copyOutput(location);
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss, Mockito.times(1))).copyFailed(this.map1ID, location);
    }

    @Test(timeout = 10000)
    public void testInterruptInMemory() throws Exception {
        DirectInMemoryOutput directInMemoryOutput = (DirectInMemoryOutput) Mockito.spy(new DirectInMemoryOutput(this.job, this.id, this.mm, 100, null, true));
        Mockito.when(this.mm.reserve((TaskAttemptID) Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn(directInMemoryOutput);
        ((DirectShuffleMergeManagerImpl) Mockito.doNothing().when(this.mm)).waitForResource();
        long length = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long length2 = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(128);
        byteArrayOutputStream.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] bArr = {(byte) (length >>> 56), (byte) (length >>> 48), (byte) (length >>> 40), (byte) (length >>> 32), (byte) (length >>> 24), (byte) (length >>> 16), (byte) (length >>> 8), (byte) (length >>> 0)};
        byteArrayOutputStream.write(bArr);
        bArr[0] = (byte) (length2 >>> 56);
        bArr[1] = (byte) (length2 >>> 48);
        bArr[2] = (byte) (length2 >>> 40);
        bArr[3] = (byte) (length2 >>> 32);
        bArr[4] = (byte) (length2 >>> 24);
        bArr[5] = (byte) (length2 >>> 16);
        bArr[6] = (byte) (length2 >>> 8);
        bArr[7] = (byte) (length2 >>> 0);
        byteArrayOutputStream.write(bArr);
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        StuckInputStream stuckInputStream = new StuckInputStream(byteArrayOutputStream.toByteArray());
        try {
            Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn(new FSDataInputStream(stuckInputStream) { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.6
                public long getFileLength() {
                    return byteArray.length;
                }
            });
        } catch (IOException e) {
            Assert.fail();
        }
        FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(2, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, (MapOutputFile) Mockito.mock(MapRFsOutputFile.class), this.rfs);
        fakeDirectShuffleFetcher.start();
        stuckInputStream.waitForFetcher();
        fakeDirectShuffleFetcher.shutDown();
        fakeDirectShuffleFetcher.join();
        Assert.assertTrue(stuckInputStream.wasClosedProperly());
        ((DirectInMemoryOutput) Mockito.verify(directInMemoryOutput)).abort();
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).copyFailed((TaskAttemptID) Mockito.any(TaskAttemptID.class), (MapOutputLocation) Mockito.any(MapOutputLocation.class));
    }

    @Test(timeout = 10000)
    public void testInterruptOnDisk() throws Exception {
        Path path = new Path("file:///tmp/foo");
        Path tempPath = DirectOnDiskMapOutput.getTempPath(path, 7);
        FileSystem fileSystem = (FileSystem) Mockito.mock(FileSystem.class, Mockito.RETURNS_DEEP_STUBS);
        MapOutputFile mapOutputFile = (MapOutputFile) Mockito.mock(MapRFsOutputFile.class);
        Mockito.when(mapOutputFile.getInputFileForWrite((TaskID) Mockito.any(TaskID.class), Mockito.anyLong())).thenReturn(path);
        DirectOnDiskMapOutput directOnDiskMapOutput = (DirectOnDiskMapOutput) Mockito.spy(new DirectOnDiskMapOutput(this.map1ID, this.id, this.mm, 100L, this.job, mapOutputFile, 7, true, fileSystem, path));
        Mockito.when(this.mm.reserve((TaskAttemptID) Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn(directOnDiskMapOutput);
        ((DirectShuffleMergeManagerImpl) Mockito.doNothing().when(this.mm)).waitForResource();
        long length = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long length2 = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(128);
        byteArrayOutputStream.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] bArr = {(byte) (length >>> 56), (byte) (length >>> 48), (byte) (length >>> 40), (byte) (length >>> 32), (byte) (length >>> 24), (byte) (length >>> 16), (byte) (length >>> 8), (byte) (length >>> 0)};
        byteArrayOutputStream.write(bArr);
        bArr[0] = (byte) (length2 >>> 56);
        bArr[1] = (byte) (length2 >>> 48);
        bArr[2] = (byte) (length2 >>> 40);
        bArr[3] = (byte) (length2 >>> 32);
        bArr[4] = (byte) (length2 >>> 24);
        bArr[5] = (byte) (length2 >>> 16);
        bArr[6] = (byte) (length2 >>> 8);
        bArr[7] = (byte) (length2 >>> 0);
        byteArrayOutputStream.write(bArr);
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        StuckInputStream stuckInputStream = new StuckInputStream(byteArrayOutputStream.toByteArray());
        try {
            Mockito.when(this.rfs.openFid2((PathId) Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn(new FSDataInputStream(stuckInputStream) { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleFetcher.7
                public long getFileLength() {
                    return byteArray.length;
                }
            });
        } catch (IOException e) {
            Assert.fail();
        }
        FakeDirectShuffleFetcher fakeDirectShuffleFetcher = new FakeDirectShuffleFetcher(7, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mapOutputFile, this.rfs);
        fakeDirectShuffleFetcher.start();
        stuckInputStream.waitForFetcher();
        fakeDirectShuffleFetcher.shutDown();
        fakeDirectShuffleFetcher.join();
        Assert.assertTrue(stuckInputStream.wasClosedProperly());
        ((FileSystem) Mockito.verify(fileSystem)).create((Path) Mockito.eq(tempPath));
        ((FileSystem) Mockito.verify(fileSystem)).delete((Path) Mockito.eq(tempPath), Mockito.eq(false));
        ((DirectOnDiskMapOutput) Mockito.verify(directOnDiskMapOutput)).abort();
        ((DirectShuffleSchedulerImpl) Mockito.verify(this.ss)).copyFailed((TaskAttemptID) Mockito.any(TaskAttemptID.class), (MapOutputLocation) Mockito.any(MapOutputLocation.class));
    }
}
