/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.task.reduce.DirectInMemoryOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleMergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;

public class TestDirectShuffleMergeManager {
    @Test(timeout=10000L)
    public void testMemoryMerge() throws Exception {
        int TOTAL_MEM_BYTES = 10000;
        int OUTPUT_SIZE = 7950;
        JobConf conf = new JobConf();
        conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent", 1.0f);
        conf.setLong("mapreduce.reduce.memory.totalbytes", 10000L);
        conf.setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.8f);
        conf.setFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f);
        conf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        conf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        TestExceptionReporter reporter = new TestExceptionReporter();
        CyclicBarrier mergeStart = new CyclicBarrier(2);
        CyclicBarrier mergeComplete = new CyclicBarrier(2);
        final StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter, mergeStart, mergeComplete);
        MapOutput out1 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out1 instanceof DirectInMemoryOutput));
        DirectInMemoryOutput mout1 = (DirectInMemoryOutput)out1;
        this.fillOutput((DirectInMemoryOutput<Text, Text>)mout1);
        MapOutput out2 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out2 instanceof DirectInMemoryOutput));
        DirectInMemoryOutput mout2 = (DirectInMemoryOutput)out2;
        this.fillOutput((DirectInMemoryOutput<Text, Text>)mout2);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    MapOutput out3 = mgr.reserve(null, 7950L, 0);
                    Assert.assertNotNull((String)"Should be told to wait", (Object)out3);
                    Assert.assertTrue((String)"Should be a memory merge", (boolean)(out3 instanceof DirectInMemoryOutput));
                    DirectInMemoryOutput mout3 = (DirectInMemoryOutput)out3;
                    TestDirectShuffleMergeManager.this.fillOutput((DirectInMemoryOutput<Text, Text>)mout3);
                    mout3.commit();
                }
                catch (IOException e) {
                    Assert.fail();
                }
            }
        });
        t.start();
        mout1.commit();
        mout2.commit();
        t.join();
        mergeStart.await();
        Assert.assertEquals((long)1L, (long)mgr.getNumMerges());
        out1 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out1 instanceof DirectInMemoryOutput));
        mout1 = (DirectInMemoryOutput)out1;
        this.fillOutput((DirectInMemoryOutput<Text, Text>)mout1);
        mout1.commit();
        mergeComplete.await();
        out2 = mgr.reserve(null, 7950L, 0);
        Assert.assertTrue((String)"Should be a memory merge", (boolean)(out2 instanceof DirectInMemoryOutput));
        mout2 = (DirectInMemoryOutput)out2;
        this.fillOutput((DirectInMemoryOutput<Text, Text>)mout2);
        Thread t1 = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    MapOutput out3 = mgr.reserve(null, 7950L, 0);
                    Assert.assertNotNull((String)"Should be told to wait", (Object)out3);
                    Assert.assertTrue((String)"Should be a memory merge", (boolean)(out3 instanceof DirectInMemoryOutput));
                    out3 = (DirectInMemoryOutput)out3;
                    DirectInMemoryOutput mout3 = (DirectInMemoryOutput)out3;
                    TestDirectShuffleMergeManager.this.fillOutput((DirectInMemoryOutput<Text, Text>)mout3);
                    mout3.commit();
                }
                catch (IOException e) {
                    Assert.fail();
                }
            }
        });
        t1.start();
        mout2.commit();
        t1.join();
        mergeStart.await();
        Assert.assertEquals((long)2L, (long)mgr.getNumMerges());
        mergeComplete.await();
        Assert.assertEquals((long)3L, (long)mgr.getNumMerges());
        Assert.assertEquals((String)"exception reporter invoked", (long)0L, (long)reporter.getNumExceptions());
    }

    private void fillOutput(DirectInMemoryOutput<Text, Text> output) throws IOException {
        BoundedByteArrayOutputStream stream = output.getArrayStream();
        int count = stream.getLimit();
        for (int i = 0; i < count; ++i) {
            stream.write(i);
        }
    }

    @Test(timeout=10000L)
    public void testOnDiskMerger() throws IOException, URISyntaxException, InterruptedException {
        JobConf jobConf = new JobConf();
        int SORT_FACTOR = 5;
        jobConf.setInt("mapreduce.task.io.sort.factor", 5);
        jobConf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        jobConf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        jobConf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        MapOutputFile mapOutputFile = (MapOutputFile)Mockito.mock(MapOutputFile.class);
        ((MapOutputFile)Mockito.doNothing().when((Object)mapOutputFile)).setConf((Configuration)Matchers.any(JobConf.class));
        LocalFileSystem fs = FileSystem.getLocal((Configuration)jobConf);
        DirectShuffleMergeManagerImpl manager = new DirectShuffleMergeManagerImpl(null, jobConf, (FileSystem)fs, null, null, null, null, null, null, null, null, null, mapOutputFile);
        MergeThread onDiskMerger = (MergeThread)Whitebox.getInternalState((Object)manager, (String)"onDiskMerger");
        int mergeFactor = (Integer)Whitebox.getInternalState((Object)onDiskMerger, (String)"mergeFactor");
        Assert.assertEquals((long)mergeFactor, (long)5L);
        onDiskMerger.suspend();
        Random rand = new Random();
        for (int i = 0; i < 10; ++i) {
            Path path = new Path("somePath");
            FileStatus cap = new FileStatus((long)Math.abs(rand.nextInt()), false, 0, 0L, 0L, path);
            manager.closeOnDiskFile(cap);
        }
        LinkedList pendingToBeMerged = (LinkedList)Whitebox.getInternalState((Object)onDiskMerger, (String)"pendingToBeMerged");
        Assert.assertTrue((String)"No inputs were added to list pending to merge", (pendingToBeMerged.size() > 0 ? 1 : 0) != 0);
        for (int i = 0; i < pendingToBeMerged.size(); ++i) {
            List inputs = (List)pendingToBeMerged.get(i);
            Assert.assertTrue((String)"Not enough / too many inputs were going to be merged", (inputs.size() > 0 && inputs.size() <= 5 ? 1 : 0) != 0);
            for (int j = 1; j < inputs.size(); ++j) {
                Assert.assertTrue((String)"Inputs to be merged were not sorted according to size: ", (((FileStatus)inputs.get(j)).getLen() >= ((FileStatus)inputs.get(j - 1)).getLen() ? 1 : 0) != 0);
            }
        }
    }

    private static class TestExceptionReporter
    implements ExceptionReporter {
        private List<Throwable> exceptions = new ArrayList<Throwable>();

        private TestExceptionReporter() {
        }

        public void reportException(Throwable t) {
            this.exceptions.add(t);
            t.printStackTrace();
        }

        public int getNumExceptions() {
            return this.exceptions.size();
        }
    }

    private static class TestMergeThread
    extends MergeThread<DirectInMemoryOutput<Text, Text>, Text, Text> {
        private AtomicInteger numMerges = new AtomicInteger(0);
        private CyclicBarrier mergeStart;
        private CyclicBarrier mergeComplete;

        public TestMergeThread(DirectShuffleMergeManagerImpl<Text, Text> mergeManager, ExceptionReporter reporter) {
            super(mergeManager, Integer.MAX_VALUE, reporter);
        }

        public synchronized void setSyncBarriers(CyclicBarrier mergeStart, CyclicBarrier mergeComplete) {
            this.mergeStart = mergeStart;
            this.mergeComplete = mergeComplete;
        }

        public int getNumMerges() {
            return this.numMerges.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void merge(List<DirectInMemoryOutput<Text, Text>> inputs) throws IOException {
            TestMergeThread testMergeThread = this;
            synchronized (testMergeThread) {
                this.numMerges.incrementAndGet();
                for (DirectInMemoryOutput<Text, Text> input : inputs) {
                    ((DirectShuffleMergeManagerImpl)this.manager).unreserve(input.getSize());
                }
            }
            try {
                this.mergeStart.await();
                this.mergeComplete.await();
            }
            catch (InterruptedException interruptedException) {
            }
            catch (BrokenBarrierException brokenBarrierException) {
                // empty catch block
            }
        }
    }

    private static class StubbedMergeManager
    extends DirectShuffleMergeManagerImpl<Text, Text> {
        private TestMergeThread mergeThread;

        public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, CyclicBarrier mergeStart, CyclicBarrier mergeComplete) throws IOException {
            super(null, conf, (FileSystem)Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, reporter, null, (MapOutputFile)Mockito.mock(MapOutputFile.class));
            this.mergeThread.setSyncBarriers(mergeStart, mergeComplete);
        }

        protected MergeThread<DirectInMemoryOutput<Text, Text>, Text, Text> createInMemoryMerger() {
            this.mergeThread = new TestMergeThread(this, this.getExceptionReporter());
            return this.mergeThread;
        }

        public int getNumMerges() {
            return this.mergeThread.getNumMerges();
        }
    }
}

