/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.reduce.DirectInMemoryOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleMergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestDirectShuffleMerger {
    private Configuration conf;
    private JobConf jobConf;
    private FileSystem fs;

    @Before
    public void setup() throws IOException {
        this.conf = new Configuration();
        this.conf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        this.conf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        this.conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        this.conf.set("fs.default.name", "file:///");
        this.jobConf = new JobConf();
        this.jobConf.set("mapred.ifile.outputstream", "org.apache.hadoop.mapred.MapRIFileOutputStream");
        this.jobConf.set("mapred.ifile.inputstream", "org.apache.hadoop.mapred.MapRIFileInputStream");
        this.jobConf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        this.jobConf.set("fs.default.name", "file:///");
        this.fs = FileSystem.getLocal((Configuration)this.conf);
    }

    @After
    public void cleanup() throws IOException {
        this.fs.delete(new Path("/tmp/abc1"), true);
    }

    @Test
    public void testInMemoryMerger() throws Throwable {
        JobID jobId = new JobID("a", 0);
        TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
        TaskAttemptID mapId1 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 1), 0);
        TaskAttemptID mapId2 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 2), 0);
        MapOutputFile mof = (MapOutputFile)Mockito.mock(MapRFsOutputFile.class);
        DirectShuffleMergeManagerImpl mergeManager = new DirectShuffleMergeManagerImpl(reduceId, this.jobConf, this.fs, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), mof);
        TreeMap<String, String> map1 = new TreeMap<String, String>();
        map1.put("apple", "disgusting");
        map1.put("carrot", "delicious");
        TreeMap<String, String> map2 = new TreeMap<String, String>();
        map2.put("banana", "pretty good");
        byte[] mapOutputBytes1 = this.writeMapOutput(this.conf, map1);
        byte[] mapOutputBytes2 = this.writeMapOutput(this.conf, map2);
        DirectInMemoryOutput mapOutput1 = new DirectInMemoryOutput(this.conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
        DirectInMemoryOutput mapOutput2 = new DirectInMemoryOutput(this.conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
        System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length);
        System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length);
        MergeThread inMemoryMerger = mergeManager.createInMemoryMerger();
        ArrayList<DirectInMemoryOutput> mapOutputs = new ArrayList<DirectInMemoryOutput>();
        mapOutputs.add(mapOutput1);
        mapOutputs.add(mapOutput2);
        Mockito.when((Object)mof.getInputFileForWrite(mapId1.getTaskID(), 103L)).thenReturn((Object)new Path("/tmp/abc1"));
        Mockito.when((Object)mof.getInputFileForWrite(mapId2.getTaskID(), 0L)).thenReturn((Object)new Path("/tmp/abc2"));
        inMemoryMerger.merge(mapOutputs);
        Assert.assertEquals((int)1, (int)mergeManager.onDiskMapOutputs.size());
        Path outPath = ((FileStatus)mergeManager.onDiskMapOutputs.iterator().next()).getPath();
        ArrayList<String> keys = new ArrayList<String>();
        ArrayList<String> values = new ArrayList<String>();
        this.readOnDiskMapOutput(this.conf, this.fs, outPath, keys, values);
        Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
        Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
        mergeManager.close();
        Assert.assertEquals((int)0, (int)mergeManager.inMemoryMapOutputs.size());
        Assert.assertEquals((int)0, (int)mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals((int)0, (int)mergeManager.onDiskMapOutputs.size());
    }

    private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        FSDataOutputStream fsdos = new FSDataOutputStream((OutputStream)baos, null);
        IFile.Writer writer = new IFile.Writer(conf, fsdos, Text.class, Text.class, null, null);
        for (String key : keysToValues.keySet()) {
            String value = keysToValues.get(key);
            writer.append((Object)new Text(key), (Object)new Text(value));
        }
        writer.close();
        return baos.toByteArray();
    }

    private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException {
        IFile.Reader reader = new IFile.Reader(conf, fs, path, null, null);
        DataInputBuffer keyBuff = new DataInputBuffer();
        DataInputBuffer valueBuff = new DataInputBuffer();
        Text key = new Text();
        Text value = new Text();
        while (reader.nextRawKey(keyBuff)) {
            key.readFields((DataInput)keyBuff);
            keys.add(key.toString());
            reader.nextRawValue(valueBuff);
            value.readFields((DataInput)valueBuff);
            values.add(value.toString());
        }
    }

    @Test
    public void testCompressed() throws IOException {
        this.testMergeShouldReturnProperProgress(this.getCompressedSegments());
    }

    @Test
    public void testUncompressed() throws IOException {
        this.testMergeShouldReturnProperProgress(this.getUncompressedSegments());
    }

    public void testMergeShouldReturnProperProgress(List<Merger.Segment<Text, Text>> segments) throws IOException {
        Path tmpDir = new Path("localpath");
        Class keyClass = this.jobConf.getMapOutputKeyClass();
        Class valueClass = this.jobConf.getMapOutputValueClass();
        RawComparator comparator = this.jobConf.getOutputKeyComparator();
        Counters.Counter readsCounter = new Counters.Counter();
        Counters.Counter writesCounter = new Counters.Counter();
        Progress mergePhase = new Progress();
        RawKeyValueIterator mergeQueue = Merger.merge((Configuration)this.conf, (FileSystem)this.fs, (Class)keyClass, (Class)valueClass, segments, (int)2, (Path)tmpDir, (RawComparator)comparator, (Progressable)this.getReporter(), (Counters.Counter)readsCounter, (Counters.Counter)writesCounter, (Progress)mergePhase);
        Assert.assertEquals((Object)Float.valueOf(1.0f), (Object)Float.valueOf(mergeQueue.getProgress().get()));
    }

    private Progressable getReporter() {
        Progressable reporter = new Progressable(){

            public void progress() {
            }
        };
        return reporter;
    }

    private List<Merger.Segment<Text, Text>> getUncompressedSegments() throws IOException {
        ArrayList<Merger.Segment<Text, Text>> segments = new ArrayList<Merger.Segment<Text, Text>>();
        for (int i = 1; i < 1; ++i) {
            segments.add(this.getUncompressedSegment(i));
        }
        return segments;
    }

    private List<Merger.Segment<Text, Text>> getCompressedSegments() throws IOException {
        ArrayList<Merger.Segment<Text, Text>> segments = new ArrayList<Merger.Segment<Text, Text>>();
        for (int i = 1; i < 1; ++i) {
            segments.add(this.getCompressedSegment(i));
        }
        return segments;
    }

    private Merger.Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
        return new Merger.Segment(this.getReader(i), false);
    }

    private Merger.Segment<Text, Text> getCompressedSegment(int i) throws IOException {
        return new Merger.Segment(this.getReader(i), false, 3000L);
    }

    private IFile.Reader<Text, Text> getReader(int i) throws IOException {
        IFile.Reader readerMock = (IFile.Reader)Mockito.mock(IFile.Reader.class);
        Mockito.when((Object)readerMock.getPosition()).thenReturn((Object)0L).thenReturn((Object)10L).thenReturn((Object)20L);
        Mockito.when((Object)readerMock.nextRawKey((DataInputBuffer)Matchers.any(DataInputBuffer.class))).thenAnswer(this.getKeyAnswer("Segment" + i));
        ((IFile.Reader)Mockito.doAnswer(this.getValueAnswer("Segment" + i)).when((Object)readerMock)).nextRawValue((DataInputBuffer)Matchers.any(DataInputBuffer.class));
        return readerMock;
    }

    private Answer<?> getKeyAnswer(final String segmentName) {
        return new Answer<Object>(){
            int i = 0;

            public Boolean answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                DataInputBuffer key = (DataInputBuffer)args[0];
                if (this.i++ == 2) {
                    return false;
                }
                key.reset(("Segment Key " + segmentName + this.i).getBytes(), 20);
                return true;
            }
        };
    }

    private Answer<?> getValueAnswer(final String segmentName) {
        return new Answer<Void>(){
            int i = 0;

            public Void answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                DataInputBuffer key = (DataInputBuffer)args[0];
                if (this.i++ == 2) {
                    return null;
                }
                key.reset(("Segment Value " + segmentName + this.i).getBytes(), 20);
                return null;
            }
        };
    }
}

