/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URLConnection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.MapOutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MapReduceLocalData;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progress;

public class TestLimitNQuery
extends TestCase {
    private static final int NUM_HADOOP_DATA_NODES = 2;
    private static final int NUM_MAPPERS = 50;
    private static final int NUM_LINES = 1000;
    private static final Path INPUT_DIR = new Path("/testplugin/input");
    private static final Path DUMMY_OUTPUT = new Path("/testplugin/dummy");
    private static final Path REAL_OUTPUT = new Path("/testplugin/output");
    private static final String RECORD_LIMIT_ATTR = "record.limit.count";
    private static final int RECORD_LIMIT = 10;
    private static final int MIN_KEY_VALUE_TO_ACCEPT = 50;
    private static final int MAX_KEY_VALUE = 1000;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testLimitNQuery() throws Exception {
        MiniDFSCluster dfsCluster = null;
        MiniMRCluster mrCluster = null;
        DistributedFileSystem fileSystem = null;
        try {
            Configuration conf = new Configuration();
            dfsCluster = new MiniDFSCluster(conf, 2, true, null);
            fileSystem = dfsCluster.getFileSystem();
            mrCluster = new MiniMRCluster(2, fileSystem.getUri().toString(), 1);
            this.createInput((FileSystem)fileSystem);
            this.runLimitNQueryTest(new JobConf((Configuration)mrCluster.createJobConf()), (FileSystem)fileSystem);
        }
        finally {
            if (dfsCluster != null) {
                dfsCluster.shutdown();
            }
            if (mrCluster != null) {
                mrCluster.shutdown();
            }
        }
    }

    private void createInput(FileSystem fs) throws Exception {
        fs.delete(INPUT_DIR, true);
        for (int i = 0; i < 50; ++i) {
            FSDataOutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)os);
            for (int j = 0; j < 1000; ++j) {
                int randomKey = (int)(1000.0 * Math.random());
                writer.write("" + randomKey + " " + randomKey + "\n");
            }
            ((Writer)writer).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runLimitNQueryTest(JobConf job, FileSystem fileSystem) throws Exception {
        fileSystem.delete(DUMMY_OUTPUT, true);
        fileSystem.delete(REAL_OUTPUT, true);
        job.setJobName("LimitNQueryTest");
        JobClient client = new JobClient(job);
        RunningJob submittedJob = null;
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{INPUT_DIR});
        FileOutputFormat.setOutputPath((JobConf)job, (Path)DUMMY_OUTPUT);
        job.set("mapred.textoutputformat.separator", " ");
        job.setMaxMapAttempts(1);
        job.setMaxReduceAttempts(1);
        job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
        job.setInt(RECORD_LIMIT_ATTR, 10);
        job.setInputFormat(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(MapperWithConditionBasedFiltering.class);
        job.setReducerClass(LimitNReducer.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setNumReduceTasks(1);
        job.set("mapreduce.job.map.output.collector.class", MapOutputCopier.class.getName());
        job.set("mapreduce.job.reduce.shuffle.consumer.plugin.class", ReduceInputCopier.class.getName());
        try {
            submittedJob = client.submitJob(job);
            try {
                if (!client.monitorAndPrintJob(job, submittedJob)) {
                    throw new IOException("Job failed!");
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        catch (IOException ioe) {
            System.err.println("Job failed with: " + ioe);
        }
        finally {
            this.verifyOutput(submittedJob, fileSystem);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem) throws Exception {
        FSDataInputStream dis = null;
        long numRecordsSelected = 0L;
        long numInvalidRecords = 0L;
        long numMappersLaunched = 50L;
        try {
            String record;
            dis = fileSystem.open(new Path(REAL_OUTPUT, "finalOut"));
            while ((record = dis.readLine()) != null) {
                int blankPos = record.indexOf(" ");
                String keyString = record.substring(0, blankPos);
                String valueString = record.substring(blankPos + 1);
                int keyValue = 0;
                int valueValue = 0;
                try {
                    keyValue = Integer.parseInt(keyString);
                    valueValue = Integer.parseInt(valueString);
                }
                catch (NumberFormatException nfe) {
                    System.err.println(nfe);
                }
                if (keyValue >= 50 && keyValue <= 1000 && keyValue == valueValue) {
                    ++numRecordsSelected;
                    continue;
                }
                ++numInvalidRecords;
            }
        }
        finally {
            if (dis != null) {
                dis.close();
            }
        }
        TestLimitNQuery.assertEquals((long)0L, (long)numInvalidRecords);
        TestLimitNQuery.assertEquals((long)10L, (long)numRecordsSelected);
        Counters counters = submittedJob.getCounters();
        numMappersLaunched = counters.getCounter((Enum)JobInProgress.Counter.TOTAL_LAUNCHED_MAPS);
        TestLimitNQuery.assertTrue((numMappersLaunched < 50L ? 1 : 0) != 0);
    }

    static class KeyValueReader<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private byte[] keyStorage;
        private byte[] valueStorage;
        private DataInputBuffer keyBuffer;
        private DataInputBuffer valueBuffer;
        private DataInputStream inputStream;
        private boolean moreToGo;
        private K key;
        private V value;

        public KeyValueReader(Configuration conf, InputStream input, Class<K> kyClass, Class<V> valClass) throws IOException {
            this.keyClass = kyClass;
            this.valueClass = valClass;
            this.keyStorage = new byte[0];
            this.valueStorage = new byte[0];
            this.keyBuffer = new DataInputBuffer();
            this.valueBuffer = new DataInputBuffer();
            this.inputStream = new DataInputStream(input);
            this.moreToGo = true;
        }

        public boolean next() throws IOException {
            if (this.moreToGo) {
                int currentKeyLength = WritableUtils.readVInt((DataInput)this.inputStream);
                int currentValueLength = WritableUtils.readVInt((DataInput)this.inputStream);
                if (currentKeyLength == -1 && currentValueLength == -1) {
                    this.moreToGo = false;
                } else {
                    if (this.keyStorage.length < currentKeyLength) {
                        this.keyStorage = new byte[currentKeyLength << 1];
                    }
                    if (this.valueStorage.length < currentValueLength) {
                        this.valueStorage = new byte[currentValueLength << 1];
                    }
                    if (this.inputStream.read(this.keyStorage, 0, currentKeyLength) != currentKeyLength || this.inputStream.read(this.valueStorage, 0, currentValueLength) != currentValueLength) {
                        throw new IOException("Error reading key and value.  Premature end of input");
                    }
                    this.keyBuffer.reset(this.keyStorage, currentKeyLength);
                    this.valueBuffer.reset(this.valueStorage, currentValueLength);
                }
            }
            return this.moreToGo;
        }

        public DataInputBuffer getKey() throws IOException {
            if (this.moreToGo) {
                return this.keyBuffer;
            }
            throw new NoSuchElementException();
        }

        public DataInputBuffer getValue() throws IOException {
            if (this.moreToGo) {
                return this.valueBuffer;
            }
            throw new NoSuchElementException();
        }

        public Progress getProgress() {
            return null;
        }

        public void close() throws IOException {
            this.moreToGo = false;
            this.inputStream.close();
        }
    }

    static class KeyValueWriter<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private DataOutputBuffer dataBuffer;
        private Serializer<K> keySerializer;
        private Serializer<V> valueSerializer;
        private DataOutputStream outputStream;

        public KeyValueWriter(Configuration conf, OutputStream output, Class<K> kyClass, Class<V> valClass) throws IOException {
            this.keyClass = kyClass;
            this.valueClass = valClass;
            this.dataBuffer = new DataOutputBuffer();
            SerializationFactory serializationFactory = new SerializationFactory(conf);
            this.keySerializer = serializationFactory.getSerializer(this.keyClass);
            this.keySerializer.open((OutputStream)this.dataBuffer);
            this.valueSerializer = serializationFactory.getSerializer(this.valueClass);
            this.valueSerializer.open((OutputStream)this.dataBuffer);
            this.outputStream = new DataOutputStream(output);
        }

        public void write(K key, V value) throws IOException {
            if (key.getClass() != this.keyClass) {
                throw new IOException("wrong key class: " + key.getClass() + " is not " + this.keyClass);
            }
            if (value.getClass() != this.valueClass) {
                throw new IOException("wrong value class: " + value.getClass() + " is not " + this.valueClass);
            }
            this.keySerializer.serialize(key);
            int keyLength = this.dataBuffer.getLength();
            if (keyLength < 0) {
                throw new IOException("Negative key-length not allowed: " + keyLength + " for " + key);
            }
            this.valueSerializer.serialize(value);
            int valueLength = this.dataBuffer.getLength() - keyLength;
            if (valueLength < 0) {
                throw new IOException("Negative value-length not allowed: " + valueLength + " for " + value);
            }
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)keyLength);
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)valueLength);
            this.outputStream.write(this.dataBuffer.getData(), 0, this.dataBuffer.getLength());
            this.dataBuffer.reset();
        }

        public void close() throws IOException {
            this.keySerializer.close();
            this.valueSerializer.close();
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)-1);
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)-1);
            this.outputStream.close();
        }
    }

    static class ReduceInputCopier<K, V>
    extends ReduceTask.ReduceCopier {
        private static final int BUF_SIZE = 131072;
        private JobConf jobConf;
        private Reporter reporter;
        private ReduceTask reduceTask;
        private int numParallelCopies;
        private int numMapTasks;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private LinkedList<ByteArrayOutputStream> finishedShuffleStreams;
        private LinkedList<ByteArrayOutputStream> freedShuffleStreams;
        private IOException shuffleException;
        private Thread shuffleThread;
        private ShuffledKeyValueIterator<K, V> shuffledKeyValueIterator;
        private Throwable mergeException;

        public void init(ShuffleConsumerPlugin.Context context) throws ClassNotFoundException, IOException {
            this.jobConf = context.getConf();
            this.reporter = context.getReporter();
            this.reduceTask = context.getReduceTask();
            this.numParallelCopies = this.jobConf.getInt("mapred.reduce.parallel.copies", 5);
            this.numMapTasks = this.jobConf.getNumMapTasks();
            this.keyClass = this.jobConf.getMapOutputKeyClass();
            this.valueClass = this.jobConf.getMapOutputValueClass();
            this.finishedShuffleStreams = new LinkedList();
            this.freedShuffleStreams = new LinkedList();
            for (int i = 0; i < this.numParallelCopies; ++i) {
                ByteArrayOutputStream ostream = new ByteArrayOutputStream();
                this.freedShuffleStreams.addLast(ostream);
            }
            this.shuffleException = null;
            this.shuffleThread = null;
            this.mergeException = null;
            this.shuffledKeyValueIterator = new ShuffledKeyValueIterator();
            super.init(context);
        }

        public boolean fetchOutputs() throws IOException {
            this.shuffleThread = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (!ReduceInputCopier.super.fetchOutputs() && ReduceInputCopier.this.getMergeThrowable() != null) {
                            throw new IOException("Task: " + ReduceInputCopier.this.reduceTask.getTaskID() + " - Reduce copier failed", ReduceInputCopier.this.getMergeThrowable());
                        }
                    }
                    catch (IOException ioe) {
                        ReduceInputCopier.this.shuffleException = ioe;
                    }
                }
            });
            this.shuffleThread.start();
            return true;
        }

        public RawKeyValueIterator createKVIterator(JobConf job, FileSystem fs, Reporter reporter) throws IOException {
            return this.shuffledKeyValueIterator;
        }

        public Throwable getMergeThrowable() {
            return this.mergeException;
        }

        protected void initMerger() throws IOException {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected boolean closeMerger() {
            boolean success;
            boolean bl = success = this.mergeException == null;
            if (success) {
                LinkedList<ByteArrayOutputStream> linkedList = this.finishedShuffleStreams;
                synchronized (linkedList) {
                    this.finishedShuffleStreams.addLast(null);
                    this.finishedShuffleStreams.notify();
                }
            }
            return success;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected ReduceTask.ReduceCopier.MapOutput shuffle(ReduceTask.ReduceCopier.MapOutputCopier copier, ReduceTask.ReduceCopier.MapOutputLocation mapOutputLoc, URLConnection connection, InputStream input, ReduceTask.ReduceCopier.ShuffleClientMetrics shuffleClientMetrics, Path filename, long decompressedLength, long compressedLength, long fileSize, boolean shouldCloseInput) throws IOException, InterruptedException {
            ByteArrayOutputStream output = null;
            LinkedList<ByteArrayOutputStream> linkedList = this.freedShuffleStreams;
            synchronized (linkedList) {
                while (this.freedShuffleStreams.size() <= 0) {
                    if (input != null) {
                        try {
                            input.close();
                        }
                        catch (IOException iOException) {
                        }
                        finally {
                            input = null;
                        }
                    }
                    this.freedShuffleStreams.wait();
                }
                output = this.freedShuffleStreams.removeFirst();
            }
            if (input == null) {
                connection = mapOutputLoc.getOutputLocation().openConnection();
                input = copier.setupSecureConnection(mapOutputLoc, connection);
            }
            ShuffledMapOutput mapOutput = new ShuffledMapOutput(output);
            mapOutput.shuffle(input, decompressedLength, compressedLength, shuffleClientMetrics, this.reporter);
            return new ReduceTask.ReduceCopier.MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), new byte[0], (int)compressedLength);
        }

        private class ShuffledMapOutput {
            private ByteArrayOutputStream outputStream;

            public ShuffledMapOutput(ByteArrayOutputStream output) throws IOException {
                this.outputStream = output;
                this.outputStream.reset();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void shuffle(InputStream input, long decompressedLength, long compressedLength, ReduceTask.ReduceCopier.ShuffleClientMetrics metrics, Reporter reporter) throws IOException {
                int numBytesRead;
                byte[] buf = new byte[131072];
                long totalNumBytesToRead = decompressedLength;
                int numBytesToRead = (int)Math.min(131072L, totalNumBytesToRead);
                while (numBytesToRead > 0 && (numBytesRead = input.read(buf, 0, numBytesToRead)) > 0) {
                    reporter.progress();
                    this.outputStream.write(buf, 0, numBytesRead);
                    numBytesToRead = (int)Math.min(131072L, totalNumBytesToRead -= (long)numBytesRead);
                }
                if (numBytesToRead != 0) {
                    throw new IOException("EOF before shuffle completion");
                }
                metrics.inputBytes(decompressedLength);
                LinkedList linkedList = ReduceInputCopier.this.finishedShuffleStreams;
                synchronized (linkedList) {
                    ReduceInputCopier.this.finishedShuffleStreams.addLast(this.outputStream);
                    ReduceInputCopier.this.finishedShuffleStreams.notify();
                }
            }
        }

        private class ShuffledKeyValueIterator<K, V>
        implements RawKeyValueIterator {
            private boolean firstTime = true;
            private boolean moreToGo = false;
            private KeyValueReader<K, V> currentReader = null;
            private ByteArrayOutputStream currentOutputStream = null;
            private int readerNumber = 0;
            private float progressPerMap;
            private Progress progress;

            public ShuffledKeyValueIterator() {
                this.progressPerMap = (float)(1.0 / (double)ReduceInputCopier.this.numMapTasks);
                this.progress = new Progress();
                this.progress.set((float)this.readerNumber * this.progressPerMap);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public boolean next() throws IOException {
                ReduceInputCopier.this.reporter.progress();
                boolean nextExists = false;
                if (this.firstTime) {
                    this.currentReader = this.getNextReader();
                    this.firstTime = false;
                    boolean bl = this.moreToGo = this.currentReader != null;
                }
                if (this.moreToGo) {
                    do {
                        if (this.currentReader == null) {
                            this.currentReader = this.getNextReader();
                            if (this.currentReader == null) {
                                this.moreToGo = false;
                                break;
                            }
                        }
                        if (nextExists = this.currentReader.next()) continue;
                        this.currentReader.close();
                        this.currentReader = null;
                        LinkedList linkedList = ReduceInputCopier.this.freedShuffleStreams;
                        synchronized (linkedList) {
                            ReduceInputCopier.this.freedShuffleStreams.addLast(this.currentOutputStream);
                            ReduceInputCopier.this.freedShuffleStreams.notify();
                        }
                    } while (this.currentReader == null);
                }
                return nextExists;
            }

            public DataInputBuffer getKey() throws IOException {
                ReduceInputCopier.this.reporter.progress();
                if (this.moreToGo) {
                    return this.currentReader.getKey();
                }
                throw new NoSuchElementException();
            }

            public DataInputBuffer getValue() throws IOException {
                ReduceInputCopier.this.reporter.progress();
                if (this.moreToGo) {
                    return this.currentReader.getValue();
                }
                throw new NoSuchElementException();
            }

            public Progress getProgress() {
                return this.progress;
            }

            public void close() throws IOException {
                if (this.moreToGo) {
                    this.moreToGo = false;
                    this.currentReader.close();
                    ReduceInputCopier.this.mergeException = new IOException("Premature close() of output record reader");
                }
                if (ReduceInputCopier.this.shuffleThread != null) {
                    try {
                        ReduceInputCopier.this.shuffleThread.join();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    ReduceInputCopier.this.shuffleThread = null;
                    if (ReduceInputCopier.this.shuffleException != null) {
                        throw ReduceInputCopier.this.shuffleException;
                    }
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private KeyValueReader<K, V> getNextReader() throws IOException {
                LinkedList linkedList = ReduceInputCopier.this.finishedShuffleStreams;
                synchronized (linkedList) {
                    while (ReduceInputCopier.this.finishedShuffleStreams.size() <= 0) {
                        try {
                            ReduceInputCopier.this.finishedShuffleStreams.wait();
                        }
                        catch (InterruptedException ie) {
                            throw new IOException("Wait interrupted", ie);
                        }
                    }
                    this.currentOutputStream = (ByteArrayOutputStream)ReduceInputCopier.this.finishedShuffleStreams.removeFirst();
                }
                KeyValueReader reader = null;
                if (this.currentOutputStream != null) {
                    byte[] b = this.currentOutputStream.toByteArray();
                    ByteArrayInputStream inputStream = new ByteArrayInputStream(b);
                    reader = new KeyValueReader((Configuration)ReduceInputCopier.this.jobConf, inputStream, ReduceInputCopier.this.keyClass, ReduceInputCopier.this.valueClass);
                }
                ++this.readerNumber;
                this.progress.set((float)this.readerNumber * this.progressPerMap);
                return reader;
            }
        }
    }

    static class MapOutputCopier<K, V>
    implements MapOutputCollector<K, V> {
        private static final int BUF_SIZE = 131072;
        private MapTask mapTask;
        private JobConf jobConf;
        private Task.TaskReporter reporter;
        private int numberOfPartitions;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private KeyValueWriter<K, V>[] recordWriters;
        private ByteArrayOutputStream[] outStreams;

        public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {
            this.mapTask = context.getMapTask();
            this.jobConf = context.getJobConf();
            this.reporter = context.getReporter();
            this.numberOfPartitions = this.jobConf.getNumReduceTasks();
            this.keyClass = this.jobConf.getMapOutputKeyClass();
            this.valueClass = this.jobConf.getMapOutputValueClass();
            this.recordWriters = new KeyValueWriter[this.numberOfPartitions];
            this.outStreams = new ByteArrayOutputStream[this.numberOfPartitions];
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                this.outStreams[i] = new ByteArrayOutputStream();
                this.recordWriters[i] = new KeyValueWriter<K, V>((Configuration)this.jobConf, this.outStreams[i], this.keyClass, this.valueClass);
            }
        }

        public synchronized void collect(K key, V value, int partitionNumber) throws IOException, InterruptedException {
            if (partitionNumber < 0 || partitionNumber >= this.numberOfPartitions) {
                throw new IOException("Invalid partition number: " + partitionNumber);
            }
            this.recordWriters[partitionNumber].write(key, value);
            this.reporter.progress();
        }

        public void close() throws IOException, InterruptedException {
            long totalSize = 0L;
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                this.recordWriters[i].close();
                this.outStreams[i].close();
                totalSize += (long)this.outStreams[i].size();
            }
            MapReduceLocalData mapOutputFile = this.mapTask.getMapOutputFile();
            Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize);
            Path indexPath = mapOutputFile.getOutputIndexFileForWrite((long)(this.numberOfPartitions * 24));
            this.copyPartitions(finalOutput, indexPath);
        }

        public void flush() throws IOException, InterruptedException, ClassNotFoundException {
        }

        private void copyPartitions(Path mapOutputPath, Path indexPath) throws IOException {
            LocalFileSystem localFs = FileSystem.getLocal((Configuration)this.jobConf);
            FileSystem rfs = localFs.getRaw();
            FSDataOutputStream output = rfs.create(mapOutputPath, true, 131072);
            SpillRecord spillRecord = new SpillRecord(this.numberOfPartitions);
            IndexRecord indexRecord = new IndexRecord();
            indexRecord.startOffset = 0L;
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                byte[] buffer = this.outStreams[i].toByteArray();
                output.write(buffer);
                output.flush();
                long newFileSize = output.getPos();
                indexRecord.rawLength = buffer.length;
                indexRecord.partLength = newFileSize - indexRecord.startOffset;
                spillRecord.putIndex(indexRecord, i);
                indexRecord.startOffset = newFileSize;
                this.reporter.progress();
            }
            output.close();
            spillRecord.writeToFile(indexPath, this.jobConf);
        }
    }

    public static class LimitNReducer
    implements Reducer<Text, Text, Text, Text> {
        private int recordLimit = 0;
        private Writer writer = null;

        public void configure(JobConf jobConf) {
            this.recordLimit = jobConf.getInt(TestLimitNQuery.RECORD_LIMIT_ATTR, 0);
            try {
                FileSystem fileSystem = FileSystem.get((Configuration)jobConf);
                FSDataOutputStream os = fileSystem.create(new Path(REAL_OUTPUT, "finalOut"));
                this.writer = new OutputStreamWriter((OutputStream)os);
            }
            catch (IOException ioe) {
                System.err.println("Reducer: IOException: " + ioe);
            }
        }

        public void reduce(Text key, Iterator<Text> it, OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
            while (it.hasNext() && this.recordLimit > 0) {
                if (this.writer != null) {
                    this.writer.write(key.toString() + " " + it.next().toString() + "\n");
                }
                --this.recordLimit;
                reporter.progress();
            }
            if (this.recordLimit <= 0) {
                this.closeOutput();
                reporter.progress();
                System.err.println("Record limit reached.  Job successful");
                System.exit(1);
            }
        }

        public void close() {
            try {
                this.closeOutput();
            }
            catch (IOException ioe) {
                System.err.println("Reducer: Error closing output" + ioe);
            }
        }

        private void closeOutput() throws IOException {
            if (this.writer != null) {
                this.writer.flush();
                this.writer.close();
                this.writer = null;
            }
        }
    }

    public static class MapperWithConditionBasedFiltering
    extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, Text> {
        private Text keyText = new Text();
        private Text valueText = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String record = value.toString();
            int blankPos = record.indexOf(" ");
            String keyString = record.substring(0, blankPos);
            int keyValue = 0;
            try {
                keyValue = Integer.parseInt(keyString);
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
            if (keyValue >= 50) {
                this.keyText.set(keyString);
                this.valueText.set(record.substring(blankPos + 1));
                output.collect((Object)this.keyText, (Object)this.valueText);
            }
        }

        public void close() throws IOException {
        }
    }
}

