package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;

/* JADX WARN: Classes with same name are omitted:
  input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager.class
 */
/* loaded from: input_file:hadoop-mapreduce-client-contrib-2.5.1-mapr-1410-SNAPSHOT-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager.class */
public class TestDirectShuffleMergeManager {

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$StubbedMergeManager.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.5.1-mapr-1410-SNAPSHOT-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$StubbedMergeManager.class */
    private static class StubbedMergeManager extends DirectShuffleMergeManagerImpl<Text, Text> {
        private TestMergeThread mergeThread;

        public StubbedMergeManager(JobConf jobConf, ExceptionReporter exceptionReporter, CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) throws IOException {
            super(null, jobConf, (FileSystem) Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, exceptionReporter, null, (MapOutputFile) Mockito.mock(MapOutputFile.class));
            this.mergeThread.setSyncBarriers(cyclicBarrier, cyclicBarrier2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.mapreduce.task.reduce.DirectShuffleMergeManagerImpl
        public MergeThread<DirectInMemoryOutput<Text, Text>, Text, Text> createInMemoryMerger() {
            this.mergeThread = new TestMergeThread(this, getExceptionReporter());
            return this.mergeThread;
        }

        public int getNumMerges() {
            return this.mergeThread.getNumMerges();
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$TestExceptionReporter.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.5.1-mapr-1410-SNAPSHOT-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$TestExceptionReporter.class */
    private static class TestExceptionReporter implements ExceptionReporter {
        private List<Throwable> exceptions;

        private TestExceptionReporter() {
            this.exceptions = new ArrayList();
        }

        public void reportException(Throwable th) {
            this.exceptions.add(th);
            th.printStackTrace();
        }

        public int getNumExceptions() {
            return this.exceptions.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$TestMergeThread.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-contrib-2.5.1-mapr-1410-SNAPSHOT-tests.jar:org/apache/hadoop/mapreduce/task/reduce/TestDirectShuffleMergeManager$TestMergeThread.class */
    public static class TestMergeThread extends MergeThread<DirectInMemoryOutput<Text, Text>, Text, Text> {
        private AtomicInteger numMerges;
        private CyclicBarrier mergeStart;
        private CyclicBarrier mergeComplete;

        public TestMergeThread(DirectShuffleMergeManagerImpl<Text, Text> directShuffleMergeManagerImpl, ExceptionReporter exceptionReporter) {
            super(directShuffleMergeManagerImpl, Integer.MAX_VALUE, exceptionReporter);
            this.numMerges = new AtomicInteger(0);
        }

        public synchronized void setSyncBarriers(CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            this.mergeStart = cyclicBarrier;
            this.mergeComplete = cyclicBarrier2;
        }

        public int getNumMerges() {
            return this.numMerges.get();
        }

        public void merge(List<DirectInMemoryOutput<Text, Text>> list) throws IOException {
            synchronized (this) {
                this.numMerges.incrementAndGet();
                Iterator<DirectInMemoryOutput<Text, Text>> it = list.iterator();
                while (it.hasNext()) {
                    ((DirectShuffleMergeManagerImpl) this.manager).unreserve(it.next().getSize());
                }
            }
            try {
                this.mergeStart.await();
                this.mergeComplete.await();
            } catch (InterruptedException e) {
            } catch (BrokenBarrierException e2) {
            }
        }
    }

    @Test(timeout = 10000)
    public void testMemoryMerge() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent", 1.0f);
        jobConf.setLong("mapreduce.reduce.memory.totalbytes", 10000L);
        jobConf.setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.8f);
        jobConf.setFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f);
        jobConf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        jobConf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        jobConf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        TestExceptionReporter testExceptionReporter = new TestExceptionReporter();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        final StubbedMergeManager stubbedMergeManager = new StubbedMergeManager(jobConf, testExceptionReporter, cyclicBarrier, cyclicBarrier2);
        MapOutput<Text, Text> reserve = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve instanceof DirectInMemoryOutput);
        DirectInMemoryOutput<Text, Text> directInMemoryOutput = (DirectInMemoryOutput) reserve;
        fillOutput(directInMemoryOutput);
        MapOutput<Text, Text> reserve2 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve2 instanceof DirectInMemoryOutput);
        DirectInMemoryOutput<Text, Text> directInMemoryOutput2 = (DirectInMemoryOutput) reserve2;
        fillOutput(directInMemoryOutput2);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleMergeManager.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    MapOutput<Text, Text> reserve3 = stubbedMergeManager.reserve(null, 7950L, 0);
                    Assert.assertNotNull("Should be told to wait", reserve3);
                    Assert.assertTrue("Should be a memory merge", reserve3 instanceof DirectInMemoryOutput);
                    DirectInMemoryOutput directInMemoryOutput3 = (DirectInMemoryOutput) reserve3;
                    TestDirectShuffleMergeManager.this.fillOutput(directInMemoryOutput3);
                    directInMemoryOutput3.commit();
                } catch (IOException e) {
                    Assert.fail();
                }
            }
        });
        thread.start();
        directInMemoryOutput.commit();
        directInMemoryOutput2.commit();
        thread.join();
        cyclicBarrier.await();
        Assert.assertEquals(1L, stubbedMergeManager.getNumMerges());
        MapOutput<Text, Text> reserve3 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve3 instanceof DirectInMemoryOutput);
        DirectInMemoryOutput<Text, Text> directInMemoryOutput3 = (DirectInMemoryOutput) reserve3;
        fillOutput(directInMemoryOutput3);
        directInMemoryOutput3.commit();
        cyclicBarrier2.await();
        MapOutput<Text, Text> reserve4 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertTrue("Should be a memory merge", reserve4 instanceof DirectInMemoryOutput);
        DirectInMemoryOutput<Text, Text> directInMemoryOutput4 = (DirectInMemoryOutput) reserve4;
        fillOutput(directInMemoryOutput4);
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.hadoop.mapreduce.task.reduce.TestDirectShuffleMergeManager.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    MapOutput<Text, Text> reserve5 = stubbedMergeManager.reserve(null, 7950L, 0);
                    Assert.assertNotNull("Should be told to wait", reserve5);
                    Assert.assertTrue("Should be a memory merge", reserve5 instanceof DirectInMemoryOutput);
                    DirectInMemoryOutput directInMemoryOutput5 = (DirectInMemoryOutput) reserve5;
                    TestDirectShuffleMergeManager.this.fillOutput(directInMemoryOutput5);
                    directInMemoryOutput5.commit();
                } catch (IOException e) {
                    Assert.fail();
                }
            }
        });
        thread2.start();
        directInMemoryOutput4.commit();
        thread2.join();
        cyclicBarrier.await();
        Assert.assertEquals(2L, stubbedMergeManager.getNumMerges());
        cyclicBarrier2.await();
        Assert.assertEquals(3L, stubbedMergeManager.getNumMerges());
        Assert.assertEquals("exception reporter invoked", 0L, testExceptionReporter.getNumExceptions());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fillOutput(DirectInMemoryOutput<Text, Text> directInMemoryOutput) throws IOException {
        BoundedByteArrayOutputStream arrayStream = directInMemoryOutput.getArrayStream();
        int limit = arrayStream.getLimit();
        for (int i = 0; i < limit; i++) {
            arrayStream.write(i);
        }
    }

    @Test(timeout = 10000)
    public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException {
        JobConf jobConf = new JobConf();
        jobConf.setInt("mapreduce.task.io.sort.factor", 5);
        jobConf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        jobConf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        jobConf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        DirectShuffleMergeManagerImpl directShuffleMergeManagerImpl = new DirectShuffleMergeManagerImpl(null, jobConf, FileSystem.getLocal(jobConf), null, null, null, null, null, null, null, null, null, new MapRFsOutputFile());
        MergeThread mergeThread = (MergeThread) Whitebox.getInternalState(directShuffleMergeManagerImpl, "onDiskMerger");
        Assert.assertEquals(((Integer) Whitebox.getInternalState(mergeThread, "mergeFactor")).intValue(), 5L);
        mergeThread.suspend();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            directShuffleMergeManagerImpl.closeOnDiskFile(new FileStatus(Math.abs(random.nextInt()), false, 0, 0L, 0L, new Path("somePath")));
        }
        LinkedList linkedList = (LinkedList) Whitebox.getInternalState(mergeThread, "pendingToBeMerged");
        Assert.assertTrue("No inputs were added to list pending to merge", linkedList.size() > 0);
        for (int i2 = 0; i2 < linkedList.size(); i2++) {
            List list = (List) linkedList.get(i2);
            Assert.assertTrue("Not enough / too many inputs were going to be merged", list.size() > 0 && list.size() <= 5);
            for (int i3 = 1; i3 < list.size(); i3++) {
                Assert.assertTrue("Inputs to be merged were not sorted according to size: ", ((FileStatus) list.get(i3)).getLen() >= ((FileStatus) list.get(i3 - 1)).getLen());
            }
        }
    }
}
