/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathId;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.DirectInMemoryOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectOnDiskMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleFetcher;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleMergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleSchedulerImpl;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.mapreduce.task.reduce.MapOutputLocation;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDirectShuffleFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(TestDirectShuffleFetcher.class);
    JobConf job = null;
    TaskAttemptID id = null;
    DirectShuffleSchedulerImpl<Text, Text> ss = null;
    DirectShuffleMergeManagerImpl<Text, Text> mm = null;
    Reporter r = null;
    ShuffleClientMetrics metrics = null;
    ExceptionReporter except = null;
    Counters.Counter allErrs = null;
    PathId pathId;
    FileSystem rfs;
    final String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
    final MapHost host = new MapHost("localhost", "http://localhost:8080/");
    final TaskAttemptID map1ID = TaskAttemptID.forName((String)"attempt_0_1_m_1_1");
    final TaskAttemptID map2ID = TaskAttemptID.forName((String)"attempt_0_1_m_2_1");
    @Rule
    public TestName name = new TestName();

    @Before
    public void setup() {
        LOG.info(">>>> " + this.name.getMethodName());
        this.job = new JobConf();
        this.id = TaskAttemptID.forName((String)"attempt_0_1_r_1_1");
        this.ss = (DirectShuffleSchedulerImpl)Mockito.mock(DirectShuffleSchedulerImpl.class);
        this.mm = (DirectShuffleMergeManagerImpl)Mockito.mock(DirectShuffleMergeManagerImpl.class);
        this.r = (Reporter)Mockito.mock(Reporter.class);
        this.metrics = (ShuffleClientMetrics)Mockito.mock(ShuffleClientMetrics.class);
        this.except = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        this.rfs = (FileSystem)Mockito.mock(FileSystem.class);
        this.allErrs = (Counters.Counter)Mockito.mock(Counters.Counter.class);
        Mockito.when((Object)this.r.getCounter(Mockito.anyString(), Mockito.anyString())).thenReturn((Object)this.allErrs);
        ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
        maps.add(this.map1ID);
        maps.add(this.map2ID);
        this.pathId = (PathId)Mockito.mock(PathId.class);
        Mockito.when((Object)this.pathId.getFid()).thenReturn((Object)"anypath");
        MapOutputLocation loc = new MapOutputLocation(this.map1ID, "localhost", this.pathId);
        try {
            Mockito.when((Object)this.ss.getLocation()).thenReturn((Object)loc);
        }
        catch (InterruptedException e) {
            Assert.fail();
        }
    }

    @After
    public void teardown() {
        LOG.info("<<<< " + this.name.getMethodName());
    }

    @Test
    public void testReduceOutOfDiskSpace() throws Throwable {
        LOG.info("testReduceOutOfDiskSpace");
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(1, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        String content = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n";
        long mOutL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long dL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream bos = new ByteArrayOutputStream(128);
        bos.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] writeBuffer = new byte[]{(byte)(mOutL >>> 56), (byte)(mOutL >>> 48), (byte)(mOutL >>> 40), (byte)(mOutL >>> 32), (byte)(mOutL >>> 24), (byte)(mOutL >>> 16), (byte)(mOutL >>> 8), (byte)(mOutL >>> 0)};
        bos.write(writeBuffer);
        writeBuffer[0] = (byte)(dL >>> 56);
        writeBuffer[1] = (byte)(dL >>> 48);
        writeBuffer[2] = (byte)(dL >>> 40);
        writeBuffer[3] = (byte)(dL >>> 32);
        writeBuffer[4] = (byte)(dL >>> 24);
        writeBuffer[5] = (byte)(dL >>> 16);
        writeBuffer[6] = (byte)(dL >>> 8);
        writeBuffer[7] = (byte)(dL >>> 0);
        bos.write(writeBuffer);
        final byte[] myB = bos.toByteArray();
        FakeFSDataInputStream in = new FakeFSDataInputStream(bos.toByteArray());
        FSDataInputStream fsin = new FSDataInputStream(in){

            public long getFileLength() {
                return myB.length;
            }
        };
        try {
            Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn((Object)fsin);
        }
        catch (IOException e1) {
            Assert.fail();
        }
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenThrow(new Throwable[]{new IOException("No disk space available")});
        MapOutputLocation loc = this.ss.getLocation();
        underTest.copyOutput(loc);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).reportLocalError((IOException)Mockito.any(IOException.class));
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
    }

    @Test
    public void testCopyFailed() throws Exception {
        LOG.info("testCopyFailed");
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(1, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), (String)Mockito.any(String.class), Mockito.anyInt())).thenThrow(new Throwable[]{new FileNotFoundException()});
        MapOutputLocation loc = this.ss.getLocation();
        underTest.copyOutput(loc);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map1ID, loc);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
        ((ExceptionReporter)Mockito.verify((Object)this.except, (VerificationMode)Mockito.never())).reportException((Throwable)Mockito.any(IOException.class));
    }

    @Test(timeout=10000L)
    public void testCopyFromHostWait() throws Exception {
        String content = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n";
        long mOutL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long dL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream bos = new ByteArrayOutputStream(128);
        bos.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] writeBuffer = new byte[]{(byte)(mOutL >>> 56), (byte)(mOutL >>> 48), (byte)(mOutL >>> 40), (byte)(mOutL >>> 32), (byte)(mOutL >>> 24), (byte)(mOutL >>> 16), (byte)(mOutL >>> 8), (byte)(mOutL >>> 0)};
        bos.write(writeBuffer);
        writeBuffer[0] = (byte)(dL >>> 56);
        writeBuffer[1] = (byte)(dL >>> 48);
        writeBuffer[2] = (byte)(dL >>> 40);
        writeBuffer[3] = (byte)(dL >>> 32);
        writeBuffer[4] = (byte)(dL >>> 24);
        writeBuffer[5] = (byte)(dL >>> 16);
        writeBuffer[6] = (byte)(dL >>> 8);
        writeBuffer[7] = (byte)(dL >>> 0);
        bos.write(writeBuffer);
        final byte[] myB = bos.toByteArray();
        FakeFSDataInputStream in = new FakeFSDataInputStream(bos.toByteArray());
        FSDataInputStream fsin = new FSDataInputStream(in){

            public long getFileLength() {
                return myB.length;
            }
        };
        try {
            Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn((Object)fsin);
        }
        catch (IOException e1) {
            Assert.fail();
        }
        final DirectInMemoryOutput immo = (DirectInMemoryOutput)Mockito.spy((Object)new DirectInMemoryOutput((Configuration)this.job, this.id, this.mm, 100, null, true));
        Mockito.when((Object)this.mm.unconditionalReserve(this.map1ID, 2000000L, true)).thenCallRealMethod();
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenCallRealMethod();
        Mockito.when((Object)this.mm.canShuffleToMemory(Mockito.anyLong())).thenReturn((Object)true);
        ((DirectShuffleMergeManagerImpl)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock ignore) throws Throwable {
                ignore.callRealMethod();
                return null;
            }
        }).when(this.mm)).unreserve(2000000L);
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        final FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(0, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        this.mm.unconditionalReserve(this.map1ID, 2000000L, true);
        final MapOutputLocation loc = this.ss.getLocation();
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Mockito.when((Object)TestDirectShuffleFetcher.this.mm.unconditionalReserve(TestDirectShuffleFetcher.this.map1ID, 36L, true)).thenReturn((Object)immo);
                    underTest.copyOutput(loc);
                }
                catch (IOException e) {
                    Assert.fail();
                }
                catch (InterruptedException e) {
                    Assert.fail();
                }
            }
        });
        t.start();
        Thread.sleep(1000L);
        this.mm.unreserve(2000000L);
        t.join();
        ((Counters.Counter)Mockito.verify((Object)this.allErrs, (VerificationMode)Mockito.times((int)1))).increment(1L);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map1ID, loc);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).addKnownMapOutput("localhost", this.map1ID, this.pathId);
    }

    @Test(timeout=10000L)
    public void testCopyFromHostCompressFailure() throws Exception {
        String content = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n";
        long mOutL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long dL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream bos = new ByteArrayOutputStream(128);
        bos.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] writeBuffer = new byte[]{(byte)(mOutL >>> 56), (byte)(mOutL >>> 48), (byte)(mOutL >>> 40), (byte)(mOutL >>> 32), (byte)(mOutL >>> 24), (byte)(mOutL >>> 16), (byte)(mOutL >>> 8), (byte)(mOutL >>> 0)};
        bos.write(writeBuffer);
        writeBuffer[0] = (byte)(dL >>> 56);
        writeBuffer[1] = (byte)(dL >>> 48);
        writeBuffer[2] = (byte)(dL >>> 40);
        writeBuffer[3] = (byte)(dL >>> 32);
        writeBuffer[4] = (byte)(dL >>> 24);
        writeBuffer[5] = (byte)(dL >>> 16);
        writeBuffer[6] = (byte)(dL >>> 8);
        writeBuffer[7] = (byte)(dL >>> 0);
        bos.write(writeBuffer);
        final byte[] myB = bos.toByteArray();
        FakeFSDataInputStream in = new FakeFSDataInputStream(bos.toByteArray());
        FSDataInputStream fsin = new FSDataInputStream(in){

            public long getFileLength() {
                return myB.length;
            }
        };
        try {
            Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn((Object)fsin);
        }
        catch (IOException e1) {
            Assert.fail();
        }
        DirectInMemoryOutput immo = (DirectInMemoryOutput)Mockito.mock(DirectInMemoryOutput.class);
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(0, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((DirectInMemoryOutput)Mockito.doThrow((Throwable)new InternalError()).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        MapOutputLocation loc = this.ss.getLocation();
        underTest.copyOutput(loc);
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map1ID, loc);
    }

    @Test(timeout=10000L)
    public void testInterruptInMemory() throws Exception {
        int FETCHER = 2;
        DirectInMemoryOutput immo = (DirectInMemoryOutput)Mockito.spy((Object)new DirectInMemoryOutput((Configuration)this.job, this.id, this.mm, 100, null, true));
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((DirectShuffleMergeManagerImpl)Mockito.doNothing().when(this.mm)).waitForResource();
        String content = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n";
        long mOutL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long dL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream bos = new ByteArrayOutputStream(128);
        bos.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] writeBuffer = new byte[]{(byte)(mOutL >>> 56), (byte)(mOutL >>> 48), (byte)(mOutL >>> 40), (byte)(mOutL >>> 32), (byte)(mOutL >>> 24), (byte)(mOutL >>> 16), (byte)(mOutL >>> 8), (byte)(mOutL >>> 0)};
        bos.write(writeBuffer);
        writeBuffer[0] = (byte)(dL >>> 56);
        writeBuffer[1] = (byte)(dL >>> 48);
        writeBuffer[2] = (byte)(dL >>> 40);
        writeBuffer[3] = (byte)(dL >>> 32);
        writeBuffer[4] = (byte)(dL >>> 24);
        writeBuffer[5] = (byte)(dL >>> 16);
        writeBuffer[6] = (byte)(dL >>> 8);
        writeBuffer[7] = (byte)(dL >>> 0);
        bos.write(writeBuffer);
        final byte[] myB = bos.toByteArray();
        StuckInputStream in = new StuckInputStream(bos.toByteArray());
        FSDataInputStream fsin = new FSDataInputStream(in){

            public long getFileLength() {
                return myB.length;
            }
        };
        try {
            Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn((Object)fsin);
        }
        catch (IOException e1) {
            Assert.fail();
        }
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(2, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        underTest.start();
        in.waitForFetcher();
        underTest.shutDown();
        underTest.join();
        Assert.assertTrue((boolean)in.wasClosedProperly());
        ((DirectInMemoryOutput)Mockito.verify((Object)immo)).abort();
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed((TaskAttemptID)Mockito.any(TaskAttemptID.class), (MapOutputLocation)Mockito.any(MapOutputLocation.class));
    }

    @Test(timeout=10000L)
    public void testInterruptOnDisk() throws Exception {
        int FETCHER = 7;
        Path p = new Path("file:///tmp/foo");
        Path pTmp = DirectOnDiskMapOutput.getTempPath((Path)p, (int)7);
        FileSystem mFs = (FileSystem)Mockito.mock(FileSystem.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        Mockito.when((Object)mof.getInputFileForWrite((TaskID)Mockito.any(TaskID.class), Mockito.anyLong())).thenReturn((Object)p);
        DirectOnDiskMapOutput odmo = (DirectOnDiskMapOutput)Mockito.spy((Object)new DirectOnDiskMapOutput(this.map1ID, this.id, this.mm, 100L, this.job, mof, 7, true, mFs, p));
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)odmo);
        ((DirectShuffleMergeManagerImpl)Mockito.doNothing().when(this.mm)).waitForResource();
        String content = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n";
        long mOutL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        long dL = "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes().length;
        ByteArrayOutputStream bos = new ByteArrayOutputStream(128);
        bos.write("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        byte[] writeBuffer = new byte[]{(byte)(mOutL >>> 56), (byte)(mOutL >>> 48), (byte)(mOutL >>> 40), (byte)(mOutL >>> 32), (byte)(mOutL >>> 24), (byte)(mOutL >>> 16), (byte)(mOutL >>> 8), (byte)(mOutL >>> 0)};
        bos.write(writeBuffer);
        writeBuffer[0] = (byte)(dL >>> 56);
        writeBuffer[1] = (byte)(dL >>> 48);
        writeBuffer[2] = (byte)(dL >>> 40);
        writeBuffer[3] = (byte)(dL >>> 32);
        writeBuffer[4] = (byte)(dL >>> 24);
        writeBuffer[5] = (byte)(dL >>> 16);
        writeBuffer[6] = (byte)(dL >>> 8);
        writeBuffer[7] = (byte)(dL >>> 0);
        bos.write(writeBuffer);
        final byte[] myB = bos.toByteArray();
        StuckInputStream in = new StuckInputStream(bos.toByteArray());
        FSDataInputStream fsin = new FSDataInputStream(in){

            public long getFileLength() {
                return myB.length;
            }
        };
        try {
            Mockito.when((Object)this.rfs.openFid2((PathId)Mockito.any(PathId.class), Mockito.anyString(), Mockito.anyInt())).thenReturn((Object)fsin);
        }
        catch (IOException e1) {
            Assert.fail();
        }
        FakeDirectShuffleFetcher<Text, Text> underTest = new FakeDirectShuffleFetcher<Text, Text>(7, this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, mof, this.rfs);
        underTest.start();
        in.waitForFetcher();
        underTest.shutDown();
        underTest.join();
        Assert.assertTrue((boolean)in.wasClosedProperly());
        ((FileSystem)Mockito.verify((Object)mFs)).create((Path)Mockito.eq((Object)pTmp));
        ((FileSystem)Mockito.verify((Object)mFs)).delete((Path)Mockito.eq((Object)pTmp), Mockito.eq((boolean)false));
        ((DirectOnDiskMapOutput)Mockito.verify((Object)odmo)).abort();
        ((DirectShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed((TaskAttemptID)Mockito.any(TaskAttemptID.class), (MapOutputLocation)Mockito.any(MapOutputLocation.class));
    }

    public static class FakeFSDataInputStream
    extends ByteArrayInputStream
    implements Seekable,
    PositionedReadable {
        public FakeFSDataInputStream(byte[] buf) {
            super(buf);
        }

        public void seek(long pos) throws IOException {
            this.pos = (int)pos;
        }

        public long getPos() throws IOException {
            return this.pos;
        }

        public boolean seekToNewSource(long targetPos) throws IOException {
            return false;
        }

        public int read(long position, byte[] buffer, int offset, int length) throws IOException {
            return 0;
        }

        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
        }

        public void readFully(long position, byte[] buffer) throws IOException {
        }
    }

    static class StuckInputStream
    extends FilterInputStream
    implements Seekable,
    PositionedReadable {
        boolean stuck = false;
        volatile boolean closed = false;
        int length;

        StuckInputStream(byte[] barray) {
            super(new ByteArrayInputStream(barray));
            this.length = barray.length;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int freeze() throws IOException {
            StuckInputStream stuckInputStream = this;
            synchronized (stuckInputStream) {
                this.stuck = true;
                this.notify();
            }
            while (!Thread.currentThread().isInterrupted() || this.closed) {
                if (!this.closed) continue;
                throw new IOException("underlying stream closed, triggered an error");
            }
            return 0;
        }

        @Override
        public int read(byte[] b) throws IOException {
            int ret = super.read(b);
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int ret = super.read(b, off, len);
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        @Override
        public void close() throws IOException {
            this.closed = true;
        }

        public synchronized void waitForFetcher() throws InterruptedException {
            while (!this.stuck) {
                this.wait();
            }
        }

        public boolean wasClosedProperly() {
            return this.closed;
        }

        public int read(long position, byte[] buffer, int offset, int length) throws IOException {
            int ret = super.read(buffer, offset, length);
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
            int ret = super.read(buffer, offset, length);
            if (ret != -1) {
                return;
            }
            this.freeze();
        }

        public void readFully(long position, byte[] buffer) throws IOException {
            int ret = super.read(buffer);
            if (ret != -1) {
                return;
            }
            this.freeze();
        }

        public void seek(long pos) throws IOException {
            this.skip(pos);
        }

        public long getPos() throws IOException {
            return Math.abs(this.available() - this.length);
        }

        public boolean seekToNewSource(long targetPos) throws IOException {
            return false;
        }
    }

    public static class FakeDirectShuffleFetcher<K, V>
    extends DirectShuffleFetcher<K, V> {
        public FakeDirectShuffleFetcher(int id, JobConf job, TaskAttemptID reduceId, DirectShuffleSchedulerImpl<K, V> scheduler, DirectShuffleMergeManagerImpl<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, MapOutputFile mapOutputFile, FileSystem rfs) {
            super(id, job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, mapOutputFile);
            this.rfs = rfs;
        }
    }
}

