package org.apache.hadoop.mapred;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URLConnection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.MapOutputCollector;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.util.Progress;

/* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery.class */
public class TestLimitNQuery extends TestCase {
    private static final int NUM_HADOOP_DATA_NODES = 2;
    private static final int NUM_MAPPERS = 50;
    private static final int NUM_LINES = 1000;
    private static final Path INPUT_DIR = new Path("/testplugin/input");
    private static final Path DUMMY_OUTPUT = new Path("/testplugin/dummy");
    private static final Path REAL_OUTPUT = new Path("/testplugin/output");
    private static final String RECORD_LIMIT_ATTR = "record.limit.count";
    private static final int RECORD_LIMIT = 10;
    private static final int MIN_KEY_VALUE_TO_ACCEPT = 50;
    private static final int MAX_KEY_VALUE = 1000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$KeyValueReader.class */
    public static class KeyValueReader<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private DataInputStream inputStream;
        private K key;
        private V value;
        private byte[] keyStorage = new byte[0];
        private byte[] valueStorage = new byte[0];
        private DataInputBuffer keyBuffer = new DataInputBuffer();
        private DataInputBuffer valueBuffer = new DataInputBuffer();
        private boolean moreToGo = true;

        public KeyValueReader(Configuration configuration, InputStream inputStream, Class<K> cls, Class<V> cls2) throws IOException {
            this.keyClass = cls;
            this.valueClass = cls2;
            this.inputStream = new DataInputStream(inputStream);
        }

        public boolean next() throws IOException {
            if (this.moreToGo) {
                int readVInt = WritableUtils.readVInt(this.inputStream);
                int readVInt2 = WritableUtils.readVInt(this.inputStream);
                if (readVInt == -1 && readVInt2 == -1) {
                    this.moreToGo = false;
                } else {
                    if (this.keyStorage.length < readVInt) {
                        this.keyStorage = new byte[readVInt << 1];
                    }
                    if (this.valueStorage.length < readVInt2) {
                        this.valueStorage = new byte[readVInt2 << 1];
                    }
                    if (this.inputStream.read(this.keyStorage, 0, readVInt) != readVInt || this.inputStream.read(this.valueStorage, 0, readVInt2) != readVInt2) {
                        throw new IOException("Error reading key and value.  Premature end of input");
                    }
                    this.keyBuffer.reset(this.keyStorage, readVInt);
                    this.valueBuffer.reset(this.valueStorage, readVInt2);
                }
            }
            return this.moreToGo;
        }

        public DataInputBuffer getKey() throws IOException {
            if (this.moreToGo) {
                return this.keyBuffer;
            }
            throw new NoSuchElementException();
        }

        public DataInputBuffer getValue() throws IOException {
            if (this.moreToGo) {
                return this.valueBuffer;
            }
            throw new NoSuchElementException();
        }

        public Progress getProgress() {
            return null;
        }

        public void close() throws IOException {
            this.moreToGo = false;
            this.inputStream.close();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$KeyValueWriter.class */
    static class KeyValueWriter<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private DataOutputBuffer dataBuffer = new DataOutputBuffer();
        private Serializer<K> keySerializer;
        private Serializer<V> valueSerializer;
        private DataOutputStream outputStream;

        public KeyValueWriter(Configuration configuration, OutputStream outputStream, Class<K> cls, Class<V> cls2) throws IOException {
            this.keyClass = cls;
            this.valueClass = cls2;
            SerializationFactory serializationFactory = new SerializationFactory(configuration);
            this.keySerializer = serializationFactory.getSerializer(this.keyClass);
            this.keySerializer.open(this.dataBuffer);
            this.valueSerializer = serializationFactory.getSerializer(this.valueClass);
            this.valueSerializer.open(this.dataBuffer);
            this.outputStream = new DataOutputStream(outputStream);
        }

        public void write(K k, V v) throws IOException {
            if (k.getClass() != this.keyClass) {
                throw new IOException("wrong key class: " + k.getClass() + " is not " + this.keyClass);
            }
            if (v.getClass() != this.valueClass) {
                throw new IOException("wrong value class: " + v.getClass() + " is not " + this.valueClass);
            }
            this.keySerializer.serialize(k);
            int length = this.dataBuffer.getLength();
            if (length < 0) {
                throw new IOException("Negative key-length not allowed: " + length + " for " + k);
            }
            this.valueSerializer.serialize(v);
            int length2 = this.dataBuffer.getLength() - length;
            if (length2 < 0) {
                throw new IOException("Negative value-length not allowed: " + length2 + " for " + v);
            }
            WritableUtils.writeVInt(this.outputStream, length);
            WritableUtils.writeVInt(this.outputStream, length2);
            this.outputStream.write(this.dataBuffer.getData(), 0, this.dataBuffer.getLength());
            this.dataBuffer.reset();
        }

        public void close() throws IOException {
            this.keySerializer.close();
            this.valueSerializer.close();
            WritableUtils.writeVInt(this.outputStream, -1);
            WritableUtils.writeVInt(this.outputStream, -1);
            this.outputStream.close();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$LimitNReducer.class */
    public static class LimitNReducer implements Reducer<Text, Text, Text, Text> {
        private int recordLimit = 0;
        private Writer writer = null;

        public void configure(JobConf jobConf) {
            this.recordLimit = jobConf.getInt(TestLimitNQuery.RECORD_LIMIT_ATTR, 0);
            try {
                this.writer = new OutputStreamWriter(FileSystem.get(jobConf).create(new Path(TestLimitNQuery.REAL_OUTPUT, "finalOut")));
            } catch (IOException e) {
                System.err.println("Reducer: IOException: " + e);
            }
        }

        public void reduce(Text text, Iterator<Text> it, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
            while (it.hasNext() && this.recordLimit > 0) {
                if (this.writer != null) {
                    this.writer.write(text.toString() + " " + it.next().toString() + "\n");
                }
                this.recordLimit--;
                reporter.progress();
            }
            if (this.recordLimit <= 0) {
                closeOutput();
                reporter.progress();
                System.err.println("Record limit reached.  Job successful");
                System.exit(1);
            }
        }

        public void close() {
            try {
                closeOutput();
            } catch (IOException e) {
                System.err.println("Reducer: Error closing output" + e);
            }
        }

        private void closeOutput() throws IOException {
            if (this.writer != null) {
                this.writer.flush();
                this.writer.close();
                this.writer = null;
            }
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
            reduce((Text) obj, (Iterator<Text>) it, (OutputCollector<Text, Text>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$MapOutputCopier.class */
    static class MapOutputCopier<K, V> implements MapOutputCollector<K, V> {
        private static final int BUF_SIZE = 131072;
        private MapTask mapTask;
        private JobConf jobConf;
        private Task.TaskReporter reporter;
        private int numberOfPartitions;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private KeyValueWriter<K, V>[] recordWriters;
        private ByteArrayOutputStream[] outStreams;

        public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {
            this.mapTask = context.getMapTask();
            this.jobConf = context.getJobConf();
            this.reporter = context.getReporter();
            this.numberOfPartitions = this.jobConf.getNumReduceTasks();
            this.keyClass = this.jobConf.getMapOutputKeyClass();
            this.valueClass = this.jobConf.getMapOutputValueClass();
            this.recordWriters = new KeyValueWriter[this.numberOfPartitions];
            this.outStreams = new ByteArrayOutputStream[this.numberOfPartitions];
            for (int i = 0; i < this.numberOfPartitions; i++) {
                this.outStreams[i] = new ByteArrayOutputStream();
                this.recordWriters[i] = new KeyValueWriter<>(this.jobConf, this.outStreams[i], this.keyClass, this.valueClass);
            }
        }

        public synchronized void collect(K k, V v, int i) throws IOException, InterruptedException {
            if (i < 0 || i >= this.numberOfPartitions) {
                throw new IOException("Invalid partition number: " + i);
            }
            this.recordWriters[i].write(k, v);
            this.reporter.progress();
        }

        public void close() throws IOException, InterruptedException {
            long j = 0;
            for (int i = 0; i < this.numberOfPartitions; i++) {
                this.recordWriters[i].close();
                this.outStreams[i].close();
                j += this.outStreams[i].size();
            }
            MapReduceLocalData mapOutputFile = this.mapTask.getMapOutputFile();
            Path outputFileForWrite = mapOutputFile.getOutputFileForWrite(j);
            int i2 = this.numberOfPartitions;
            MapTask mapTask = this.mapTask;
            copyPartitions(outputFileForWrite, mapOutputFile.getOutputIndexFileForWrite(i2 * 24));
        }

        public void flush() throws IOException, InterruptedException, ClassNotFoundException {
        }

        private void copyPartitions(Path path, Path path2) throws IOException {
            FSDataOutputStream create = FileSystem.getLocal(this.jobConf).getRaw().create(path, true, BUF_SIZE);
            SpillRecord spillRecord = new SpillRecord(this.numberOfPartitions);
            IndexRecord indexRecord = new IndexRecord();
            indexRecord.startOffset = 0L;
            for (int i = 0; i < this.numberOfPartitions; i++) {
                create.write(this.outStreams[i].toByteArray());
                create.flush();
                long pos = create.getPos();
                indexRecord.rawLength = r0.length;
                indexRecord.partLength = pos - indexRecord.startOffset;
                spillRecord.putIndex(indexRecord, i);
                indexRecord.startOffset = pos;
                this.reporter.progress();
            }
            create.close();
            spillRecord.writeToFile(path2, this.jobConf);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$MapperWithConditionBasedFiltering.class */
    public static class MapperWithConditionBasedFiltering extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
        private Text keyText = new Text();
        private Text valueText = new Text();

        public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
            String text2 = text.toString();
            int indexOf = text2.indexOf(" ");
            String substring = text2.substring(0, indexOf);
            int i = 0;
            try {
                i = Integer.parseInt(substring);
            } catch (NumberFormatException e) {
            }
            if (i >= 50) {
                this.keyText.set(substring);
                this.valueText.set(text2.substring(indexOf + 1));
                outputCollector.collect(this.keyText, this.valueText);
            }
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((LongWritable) obj, (Text) obj2, (OutputCollector<Text, Text>) outputCollector, reporter);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$ReduceInputCopier.class */
    public static class ReduceInputCopier<K, V> extends ReduceTask.ReduceCopier {
        private static final int BUF_SIZE = 131072;
        private JobConf jobConf;
        private Reporter reporter;
        private ReduceTask reduceTask;
        private int numParallelCopies;
        private int numMapTasks;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private LinkedList<ByteArrayOutputStream> finishedShuffleStreams;
        private LinkedList<ByteArrayOutputStream> freedShuffleStreams;
        private IOException shuffleException;
        private Thread shuffleThread;
        private ReduceInputCopier<K, V>.ShuffledKeyValueIterator<K, V> shuffledKeyValueIterator;
        private Throwable mergeException;

        /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$ReduceInputCopier$ShuffledKeyValueIterator.class */
        private class ShuffledKeyValueIterator<K, V> implements RawKeyValueIterator {
            private float progressPerMap;
            private boolean firstTime = true;
            private boolean moreToGo = false;
            private KeyValueReader<K, V> currentReader = null;
            private ByteArrayOutputStream currentOutputStream = null;
            private int readerNumber = 0;
            private Progress progress = new Progress();

            public ShuffledKeyValueIterator() {
                this.progressPerMap = (float) (1.0d / ReduceInputCopier.this.numMapTasks);
                this.progress.set(this.readerNumber * this.progressPerMap);
            }

            /* JADX WARN: Code restructure failed: missing block: B:11:0x003d, code lost:
            
                if (r3.currentReader != null) goto L16;
             */
            /* JADX WARN: Code restructure failed: missing block: B:12:0x0040, code lost:
            
                r3.currentReader = getNextReader();
             */
            /* JADX WARN: Code restructure failed: missing block: B:13:0x004c, code lost:
            
                if (r3.currentReader != null) goto L16;
             */
            /* JADX WARN: Code restructure failed: missing block: B:15:0x004f, code lost:
            
                r3.moreToGo = false;
             */
            /* JADX WARN: Code restructure failed: missing block: B:16:0x0057, code lost:
            
                r4 = r3.currentReader.next();
             */
            /* JADX WARN: Code restructure failed: missing block: B:17:0x0060, code lost:
            
                if (r4 != false) goto L28;
             */
            /* JADX WARN: Code restructure failed: missing block: B:18:0x0063, code lost:
            
                r3.currentReader.close();
                r3.currentReader = null;
                r0 = r3.this$0.freedShuffleStreams;
             */
            /* JADX WARN: Code restructure failed: missing block: B:19:0x0078, code lost:
            
                monitor-enter(r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:21:0x0079, code lost:
            
                r3.this$0.freedShuffleStreams.addLast(r3.currentOutputStream);
                r3.this$0.freedShuffleStreams.notify();
             */
            /* JADX WARN: Code restructure failed: missing block: B:22:0x0092, code lost:
            
                monitor-exit(r0);
             */
            /* JADX WARN: Code restructure failed: missing block: B:32:0x009f, code lost:
            
                if (r3.currentReader == null) goto L36;
             */
            /* JADX WARN: Code restructure failed: missing block: B:36:0x00a3, code lost:
            
                return r4;
             */
            /* JADX WARN: Code restructure failed: missing block: B:9:0x0036, code lost:
            
                if (r3.moreToGo != false) goto L11;
             */
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public boolean next() throws java.io.IOException {
                /*
                    r3 = this;
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$ReduceInputCopier r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.this
                    org.apache.hadoop.mapred.Reporter r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.access$500(r0)
                    r0.progress()
                    r0 = 0
                    r4 = r0
                    r0 = r3
                    boolean r0 = r0.firstTime
                    if (r0 == 0) goto L32
                    r0 = r3
                    r1 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader r1 = r1.getNextReader()
                    r0.currentReader = r1
                    r0 = r3
                    r1 = 0
                    r0.firstTime = r1
                    r0 = r3
                    r1 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r1 = r1.currentReader
                    if (r1 == 0) goto L2e
                    r1 = 1
                    goto L2f
                L2e:
                    r1 = 0
                L2f:
                    r0.moreToGo = r1
                L32:
                    r0 = r3
                    boolean r0 = r0.moreToGo
                    if (r0 == 0) goto La2
                L39:
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r0 = r0.currentReader
                    if (r0 != 0) goto L57
                    r0 = r3
                    r1 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader r1 = r1.getNextReader()
                    r0.currentReader = r1
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r0 = r0.currentReader
                    if (r0 != 0) goto L57
                    r0 = r3
                    r1 = 0
                    r0.moreToGo = r1
                    goto La2
                L57:
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r0 = r0.currentReader
                    boolean r0 = r0.next()
                    r4 = r0
                    r0 = r4
                    if (r0 != 0) goto L9b
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r0 = r0.currentReader
                    r0.close()
                    r0 = r3
                    r1 = 0
                    r0.currentReader = r1
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$ReduceInputCopier r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.this
                    java.util.LinkedList r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.access$600(r0)
                    r1 = r0
                    r5 = r1
                    monitor-enter(r0)
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$ReduceInputCopier r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.this     // Catch: java.lang.Throwable -> L96
                    java.util.LinkedList r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.access$600(r0)     // Catch: java.lang.Throwable -> L96
                    r1 = r3
                    java.io.ByteArrayOutputStream r1 = r1.currentOutputStream     // Catch: java.lang.Throwable -> L96
                    r0.addLast(r1)     // Catch: java.lang.Throwable -> L96
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$ReduceInputCopier r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.this     // Catch: java.lang.Throwable -> L96
                    java.util.LinkedList r0 = org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.access$600(r0)     // Catch: java.lang.Throwable -> L96
                    r0.notify()     // Catch: java.lang.Throwable -> L96
                    r0 = r5
                    monitor-exit(r0)     // Catch: java.lang.Throwable -> L96
                    goto L9b
                L96:
                    r6 = move-exception
                    r0 = r5
                    monitor-exit(r0)     // Catch: java.lang.Throwable -> L96
                    r0 = r6
                    throw r0
                L9b:
                    r0 = r3
                    org.apache.hadoop.mapred.TestLimitNQuery$KeyValueReader<K, V> r0 = r0.currentReader
                    if (r0 == 0) goto L39
                La2:
                    r0 = r4
                    return r0
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.ShuffledKeyValueIterator.next():boolean");
            }

            public DataInputBuffer getKey() throws IOException {
                ReduceInputCopier.this.reporter.progress();
                if (this.moreToGo) {
                    return this.currentReader.getKey();
                }
                throw new NoSuchElementException();
            }

            public DataInputBuffer getValue() throws IOException {
                ReduceInputCopier.this.reporter.progress();
                if (this.moreToGo) {
                    return this.currentReader.getValue();
                }
                throw new NoSuchElementException();
            }

            public Progress getProgress() {
                return this.progress;
            }

            public void close() throws IOException {
                if (this.moreToGo) {
                    this.moreToGo = false;
                    this.currentReader.close();
                    ReduceInputCopier.this.mergeException = new IOException("Premature close() of output record reader");
                }
                if (ReduceInputCopier.this.shuffleThread != null) {
                    try {
                        ReduceInputCopier.this.shuffleThread.join();
                    } catch (InterruptedException e) {
                    }
                    ReduceInputCopier.this.shuffleThread = null;
                    if (ReduceInputCopier.this.shuffleException != null) {
                        throw ReduceInputCopier.this.shuffleException;
                    }
                }
            }

            private KeyValueReader<K, V> getNextReader() throws IOException {
                synchronized (ReduceInputCopier.this.finishedShuffleStreams) {
                    while (ReduceInputCopier.this.finishedShuffleStreams.size() <= 0) {
                        try {
                            ReduceInputCopier.this.finishedShuffleStreams.wait();
                        } catch (InterruptedException e) {
                            throw new IOException("Wait interrupted", e);
                        }
                    }
                    this.currentOutputStream = (ByteArrayOutputStream) ReduceInputCopier.this.finishedShuffleStreams.removeFirst();
                }
                KeyValueReader<K, V> keyValueReader = null;
                if (this.currentOutputStream != null) {
                    keyValueReader = new KeyValueReader<>(ReduceInputCopier.this.jobConf, new ByteArrayInputStream(this.currentOutputStream.toByteArray()), ReduceInputCopier.this.keyClass, ReduceInputCopier.this.valueClass);
                }
                this.readerNumber++;
                this.progress.set(this.readerNumber * this.progressPerMap);
                return keyValueReader;
            }
        }

        /* loaded from: input_file:org/apache/hadoop/mapred/TestLimitNQuery$ReduceInputCopier$ShuffledMapOutput.class */
        private class ShuffledMapOutput {
            private ByteArrayOutputStream outputStream;

            public ShuffledMapOutput(ByteArrayOutputStream byteArrayOutputStream) throws IOException {
                this.outputStream = byteArrayOutputStream;
                this.outputStream.reset();
            }

            public void shuffle(InputStream inputStream, long j, long j2, ReduceTask.ReduceCopier.ShuffleClientMetrics shuffleClientMetrics, Reporter reporter) throws IOException {
                int i;
                int read;
                byte[] bArr = new byte[ReduceInputCopier.BUF_SIZE];
                long j3 = j;
                long min = Math.min(131072L, j3);
                while (true) {
                    i = (int) min;
                    if (i <= 0 || (read = inputStream.read(bArr, 0, i)) <= 0) {
                        break;
                    }
                    reporter.progress();
                    this.outputStream.write(bArr, 0, read);
                    j3 -= read;
                    min = Math.min(131072L, j3);
                }
                if (i != 0) {
                    throw new IOException("EOF before shuffle completion");
                }
                shuffleClientMetrics.inputBytes(j);
                synchronized (ReduceInputCopier.this.finishedShuffleStreams) {
                    ReduceInputCopier.this.finishedShuffleStreams.addLast(this.outputStream);
                    ReduceInputCopier.this.finishedShuffleStreams.notify();
                }
            }
        }

        public void init(ShuffleConsumerPlugin.Context context) throws ClassNotFoundException, IOException {
            this.jobConf = context.getConf();
            this.reporter = context.getReporter();
            this.reduceTask = context.getReduceTask();
            this.numParallelCopies = this.jobConf.getInt("mapred.reduce.parallel.copies", 5);
            this.numMapTasks = this.jobConf.getNumMapTasks();
            this.keyClass = this.jobConf.getMapOutputKeyClass();
            this.valueClass = this.jobConf.getMapOutputValueClass();
            this.finishedShuffleStreams = new LinkedList<>();
            this.freedShuffleStreams = new LinkedList<>();
            for (int i = 0; i < this.numParallelCopies; i++) {
                this.freedShuffleStreams.addLast(new ByteArrayOutputStream());
            }
            this.shuffleException = null;
            this.shuffleThread = null;
            this.mergeException = null;
            this.shuffledKeyValueIterator = new ShuffledKeyValueIterator<>();
            super.init(context);
        }

        public boolean fetchOutputs() throws IOException {
            this.shuffleThread = new Thread(new Runnable() { // from class: org.apache.hadoop.mapred.TestLimitNQuery.ReduceInputCopier.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (ReduceInputCopier.super.fetchOutputs() || ReduceInputCopier.this.getMergeThrowable() == null) {
                        } else {
                            throw new IOException("Task: " + ReduceInputCopier.this.reduceTask.getTaskID() + " - Reduce copier failed", ReduceInputCopier.this.getMergeThrowable());
                        }
                    } catch (IOException e) {
                        ReduceInputCopier.this.shuffleException = e;
                    }
                }
            });
            this.shuffleThread.start();
            return true;
        }

        public RawKeyValueIterator createKVIterator(JobConf jobConf, FileSystem fileSystem, Reporter reporter) throws IOException {
            return this.shuffledKeyValueIterator;
        }

        public Throwable getMergeThrowable() {
            return this.mergeException;
        }

        protected void initMerger() throws IOException {
        }

        protected boolean closeMerger() {
            boolean z = this.mergeException == null;
            if (z) {
                synchronized (this.finishedShuffleStreams) {
                    this.finishedShuffleStreams.addLast(null);
                    this.finishedShuffleStreams.notify();
                }
            }
            return z;
        }

        protected ReduceTask.ReduceCopier.MapOutput shuffle(ReduceTask.ReduceCopier.MapOutputCopier mapOutputCopier, ReduceTask.ReduceCopier.MapOutputLocation mapOutputLocation, URLConnection uRLConnection, InputStream inputStream, ReduceTask.ReduceCopier.ShuffleClientMetrics shuffleClientMetrics, Path path, long j, long j2, long j3, boolean z) throws IOException, InterruptedException {
            ByteArrayOutputStream removeFirst;
            synchronized (this.freedShuffleStreams) {
                while (this.freedShuffleStreams.size() <= 0) {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                            inputStream = null;
                        } catch (IOException e) {
                            inputStream = null;
                        } catch (Throwable th) {
                            throw th;
                        }
                    }
                    this.freedShuffleStreams.wait();
                }
                removeFirst = this.freedShuffleStreams.removeFirst();
            }
            if (inputStream == null) {
                inputStream = mapOutputCopier.setupSecureConnection(mapOutputLocation, mapOutputLocation.getOutputLocation().openConnection());
            }
            new ShuffledMapOutput(removeFirst).shuffle(inputStream, j, j2, shuffleClientMetrics, this.reporter);
            return new ReduceTask.ReduceCopier.MapOutput(mapOutputLocation.getTaskId(), mapOutputLocation.getTaskAttemptId(), new byte[0], (int) j2);
        }
    }

    public void testLimitNQuery() throws Exception {
        MiniDFSCluster miniDFSCluster = null;
        MiniMRCluster miniMRCluster = null;
        try {
            miniDFSCluster = new MiniDFSCluster(new Configuration(), 2, true, null);
            FileSystem fileSystem = miniDFSCluster.getFileSystem();
            miniMRCluster = new MiniMRCluster(2, fileSystem.getUri().toString(), 1);
            createInput(fileSystem);
            runLimitNQueryTest(new JobConf(miniMRCluster.createJobConf()), fileSystem);
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            if (miniMRCluster != null) {
                miniMRCluster.shutdown();
            }
        } catch (Throwable th) {
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            if (miniMRCluster != null) {
                miniMRCluster.shutdown();
            }
            throw th;
        }
    }

    private void createInput(FileSystem fileSystem) throws Exception {
        fileSystem.delete(INPUT_DIR, true);
        for (int i = 0; i < 50; i++) {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(INPUT_DIR, "input_" + i + ".txt")));
            for (int i2 = 0; i2 < 1000; i2++) {
                int random = (int) (1000.0d * Math.random());
                outputStreamWriter.write("" + random + " " + random + "\n");
            }
            outputStreamWriter.close();
        }
    }

    private void runLimitNQueryTest(JobConf jobConf, FileSystem fileSystem) throws Exception {
        fileSystem.delete(DUMMY_OUTPUT, true);
        fileSystem.delete(REAL_OUTPUT, true);
        jobConf.setJobName("LimitNQueryTest");
        JobClient jobClient = new JobClient(jobConf);
        RunningJob runningJob = null;
        FileInputFormat.setInputPaths(jobConf, new Path[]{INPUT_DIR});
        FileOutputFormat.setOutputPath(jobConf, DUMMY_OUTPUT);
        jobConf.set("mapred.textoutputformat.separator", " ");
        jobConf.setMaxMapAttempts(1);
        jobConf.setMaxReduceAttempts(1);
        jobConf.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
        jobConf.setInt(RECORD_LIMIT_ATTR, RECORD_LIMIT);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setMapOutputKeyClass(Text.class);
        jobConf.setMapOutputValueClass(Text.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMapperClass(MapperWithConditionBasedFiltering.class);
        jobConf.setReducerClass(LimitNReducer.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        jobConf.setNumReduceTasks(1);
        jobConf.set("mapreduce.job.map.output.collector.class", MapOutputCopier.class.getName());
        jobConf.set("mapreduce.job.reduce.shuffle.consumer.plugin.class", ReduceInputCopier.class.getName());
        try {
            try {
                runningJob = jobClient.submitJob(jobConf);
                try {
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (!jobClient.monitorAndPrintJob(jobConf, runningJob)) {
                    throw new IOException("Job failed!");
                }
                verifyOutput(runningJob, fileSystem);
            } catch (Throwable th) {
                verifyOutput(runningJob, fileSystem);
                throw th;
            }
        } catch (IOException e2) {
            System.err.println("Job failed with: " + e2);
            verifyOutput(runningJob, fileSystem);
        }
    }

    private void verifyOutput(RunningJob runningJob, FileSystem fileSystem) throws Exception {
        FSDataInputStream fSDataInputStream = null;
        long j = 0;
        long j2 = 0;
        try {
            fSDataInputStream = fileSystem.open(new Path(REAL_OUTPUT, "finalOut"));
            while (true) {
                String readLine = fSDataInputStream.readLine();
                if (readLine == null) {
                    break;
                }
                int indexOf = readLine.indexOf(" ");
                String substring = readLine.substring(0, indexOf);
                String substring2 = readLine.substring(indexOf + 1);
                int i = 0;
                int i2 = 0;
                try {
                    i = Integer.parseInt(substring);
                    i2 = Integer.parseInt(substring2);
                } catch (NumberFormatException e) {
                    System.err.println(e);
                }
                if (i < 50 || i > 1000 || i != i2) {
                    j2++;
                } else {
                    j++;
                }
            }
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            assertEquals(0L, j2);
            assertEquals(10L, j);
            assertTrue(runningJob.getCounters().getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS) < 50);
        } catch (Throwable th) {
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            throw th;
        }
    }
}
