package org.apache.hadoop.mapred;

import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.SortedRanges;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/mapred/ReduceTask.class
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:hadoop-mapreduce-client-core-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapred/ReduceTask.class */
public class ReduceTask extends Task {
    private static final Log LOG;
    private int numMaps;
    private CompressionCodec codec;
    private Map<TaskAttemptID, MapOutputFile> localMapFiles;
    private Progress copyPhase;
    private Progress sortPhase;
    private Progress reducePhase;
    private Counters.Counter shuffledMapsCounter;
    private Counters.Counter reduceShuffleBytes;
    private Counters.Counter reduceInputKeyCounter;
    private Counters.Counter reduceInputValueCounter;
    private Counters.Counter reduceOutputCounter;
    private Counters.Counter reduceCombineInputCounter;
    private Counters.Counter reduceCombineOutputCounter;
    private Counters.Counter fileOutputByteCounter;
    private Comparator<FileStatus> mapOutputFileComparator;
    private final SortedSet<FileStatus> mapOutputFilesOnDisk;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ReduceTask$NewTrackingRecordWriter.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-core-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapred/ReduceTask$NewTrackingRecordWriter.class */
    public static class NewTrackingRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
        private final org.apache.hadoop.mapreduce.RecordWriter<K, V> real;
        private final Counter outputRecordCounter;
        private final Counter fileOutputByteCounter;
        private final List<FileSystem.Statistics> fsStats;

        NewTrackingRecordWriter(ReduceTask reduceTask, org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext) throws InterruptedException, IOException {
            this.outputRecordCounter = reduceTask.reduceOutputCounter;
            this.fileOutputByteCounter = reduceTask.fileOutputByteCounter;
            this.fsStats = reduceTask.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat ? Task.getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(taskAttemptContext), taskAttemptContext.getConfiguration()) : null;
            long outputBytes = getOutputBytes(this.fsStats);
            this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduceTask.outputFormat.getRecordWriter(taskAttemptContext);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void close(org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            long outputBytes = getOutputBytes(this.fsStats);
            this.real.close(taskAttemptContext);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void write(K k, V v) throws IOException, InterruptedException {
            long outputBytes = getOutputBytes(this.fsStats);
            this.real.write(k, v);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
            this.outputRecordCounter.increment(1L);
        }

        private long getOutputBytes(List<FileSystem.Statistics> list) {
            if (list == null) {
                return 0L;
            }
            long j = 0;
            Iterator<FileSystem.Statistics> it = list.iterator();
            while (it.hasNext()) {
                j += it.next().getBytesWritten();
            }
            return j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ReduceTask$OldTrackingRecordWriter.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-core-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapred/ReduceTask$OldTrackingRecordWriter.class */
    public static class OldTrackingRecordWriter<K, V> implements RecordWriter<K, V> {
        private final RecordWriter<K, V> real;
        private final Counters.Counter reduceOutputCounter;
        private final Counters.Counter fileOutputByteCounter;
        private final List<FileSystem.Statistics> fsStats;

        public OldTrackingRecordWriter(ReduceTask reduceTask, JobConf jobConf, Task.TaskReporter taskReporter, String str) throws IOException {
            this.reduceOutputCounter = reduceTask.reduceOutputCounter;
            this.fileOutputByteCounter = reduceTask.fileOutputByteCounter;
            this.fsStats = jobConf.getOutputFormat() instanceof FileOutputFormat ? Task.getFsStatistics(FileOutputFormat.getOutputPath(jobConf), jobConf) : null;
            FileSystem fileSystem = FileSystem.get(jobConf);
            long outputBytes = getOutputBytes(this.fsStats);
            this.real = jobConf.getOutputFormat().getRecordWriter(fileSystem, jobConf, str, taskReporter);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
        }

        @Override // org.apache.hadoop.mapred.RecordWriter
        public void write(K k, V v) throws IOException {
            long outputBytes = getOutputBytes(this.fsStats);
            this.real.write(k, v);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
            this.reduceOutputCounter.increment(1L);
        }

        @Override // org.apache.hadoop.mapred.RecordWriter
        public void close(Reporter reporter) throws IOException {
            long outputBytes = getOutputBytes(this.fsStats);
            this.real.close(reporter);
            this.fileOutputByteCounter.increment(getOutputBytes(this.fsStats) - outputBytes);
        }

        private long getOutputBytes(List<FileSystem.Statistics> list) {
            if (list == null) {
                return 0L;
            }
            long j = 0;
            Iterator<FileSystem.Statistics> it = list.iterator();
            while (it.hasNext()) {
                j += it.next().getBytesWritten();
            }
            return j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ReduceTask$ReduceValuesIterator.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-core-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapred/ReduceTask$ReduceValuesIterator.class */
    public class ReduceValuesIterator<KEY, VALUE> extends Task.ValuesIterator<KEY, VALUE> {
        public ReduceValuesIterator(RawKeyValueIterator rawKeyValueIterator, RawComparator<KEY> rawComparator, Class<KEY> cls, Class<VALUE> cls2, Configuration configuration, Progressable progressable) throws IOException {
            super(rawKeyValueIterator, rawComparator, cls, cls2, configuration, progressable);
        }

        @Override // org.apache.hadoop.mapred.Task.ValuesIterator, java.util.Iterator
        public VALUE next() {
            ReduceTask.this.reduceInputValueCounter.increment(1L);
            return moveToNext();
        }

        protected VALUE moveToNext() {
            return (VALUE) super.next();
        }

        public void informReduceProgress() {
            ReduceTask.this.reducePhase.set(this.in.getProgress().getProgress());
            this.reporter.progress();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ReduceTask$SkippingReduceValuesIterator.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-core-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapred/ReduceTask$SkippingReduceValuesIterator.class */
    public class SkippingReduceValuesIterator<KEY, VALUE> extends ReduceValuesIterator<KEY, VALUE> {
        private SortedRanges.SkipRangeIterator skipIt;
        private TaskUmbilicalProtocol umbilical;
        private Counters.Counter skipGroupCounter;
        private Counters.Counter skipRecCounter;
        private long grpIndex;
        private Class<KEY> keyClass;
        private Class<VALUE> valClass;
        private SequenceFile.Writer skipWriter;
        private boolean toWriteSkipRecs;
        private boolean hasNext;
        private Task.TaskReporter reporter;

        public SkippingReduceValuesIterator(RawKeyValueIterator rawKeyValueIterator, RawComparator<KEY> rawComparator, Class<KEY> cls, Class<VALUE> cls2, Configuration configuration, Task.TaskReporter taskReporter, TaskUmbilicalProtocol taskUmbilicalProtocol) throws IOException {
            super(rawKeyValueIterator, rawComparator, cls, cls2, configuration, taskReporter);
            this.grpIndex = -1L;
            this.umbilical = taskUmbilicalProtocol;
            this.skipGroupCounter = taskReporter.getCounter((Enum<?>) TaskCounter.REDUCE_SKIPPED_GROUPS);
            this.skipRecCounter = taskReporter.getCounter((Enum<?>) TaskCounter.REDUCE_SKIPPED_RECORDS);
            this.toWriteSkipRecs = ReduceTask.this.toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(configuration) != null;
            this.keyClass = cls;
            this.valClass = cls2;
            this.reporter = taskReporter;
            this.skipIt = ReduceTask.this.getSkipRanges().skipRangeIterator();
            mayBeSkip();
        }

        @Override // org.apache.hadoop.mapred.Task.ValuesIterator
        public void nextKey() throws IOException {
            super.nextKey();
            mayBeSkip();
        }

        @Override // org.apache.hadoop.mapred.Task.ValuesIterator
        public boolean more() {
            return super.more() && this.hasNext;
        }

        private void mayBeSkip() throws IOException {
            this.hasNext = this.skipIt.hasNext();
            if (!this.hasNext) {
                ReduceTask.LOG.warn("Further groups got skipped.");
                return;
            }
            this.grpIndex++;
            long longValue = this.skipIt.next().longValue();
            long j = 0;
            long j2 = 0;
            while (this.grpIndex < longValue && super.more()) {
                while (hasNext()) {
                    VALUE moveToNext = moveToNext();
                    if (this.toWriteSkipRecs) {
                        writeSkippedRec(getKey(), moveToNext);
                    }
                    j2++;
                }
                super.nextKey();
                this.grpIndex++;
                j++;
            }
            if (j > 0 && this.skipIt.skippedAllRanges() && this.skipWriter != null) {
                this.skipWriter.close();
            }
            this.skipGroupCounter.increment(j);
            this.skipRecCounter.increment(j2);
            ReduceTask.this.reportNextRecordRange(this.umbilical, this.grpIndex);
        }

        private void writeSkippedRec(KEY key, VALUE value) throws IOException {
            if (this.skipWriter == null) {
                Path path = new Path(SkipBadRecords.getSkipOutputPath(ReduceTask.this.conf), ReduceTask.this.getTaskID().toString());
                this.skipWriter = SequenceFile.createWriter(path.getFileSystem(ReduceTask.this.conf), ReduceTask.this.conf, path, this.keyClass, this.valClass, SequenceFile.CompressionType.BLOCK, this.reporter);
            }
            this.skipWriter.append(key, value);
        }
    }

    public ReduceTask() {
        getProgress().setStatus("reduce");
        setPhase(TaskStatus.Phase.SHUFFLE);
        this.shuffledMapsCounter = getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
        this.reduceShuffleBytes = getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineInputCounter = getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
        this.reduceCombineOutputCounter = getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        this.fileOutputByteCounter = getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN);
        this.mapOutputFileComparator = new Comparator<FileStatus>() { // from class: org.apache.hadoop.mapred.ReduceTask.2
            @Override // java.util.Comparator
            public int compare(FileStatus fileStatus, FileStatus fileStatus2) {
                if (fileStatus.getLen() < fileStatus2.getLen()) {
                    return -1;
                }
                if (fileStatus.getLen() == fileStatus2.getLen()) {
                    return fileStatus.getPath().toString().equals(fileStatus2.getPath().toString()) ? 0 : -1;
                }
                return 1;
            }
        };
        this.mapOutputFilesOnDisk = new TreeSet(this.mapOutputFileComparator);
    }

    public ReduceTask(String str, TaskAttemptID taskAttemptID, int i, int i2, int i3) {
        super(str, taskAttemptID, i, i3);
        getProgress().setStatus("reduce");
        setPhase(TaskStatus.Phase.SHUFFLE);
        this.shuffledMapsCounter = getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
        this.reduceShuffleBytes = getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineInputCounter = getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
        this.reduceCombineOutputCounter = getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        this.fileOutputByteCounter = getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN);
        this.mapOutputFileComparator = new Comparator<FileStatus>() { // from class: org.apache.hadoop.mapred.ReduceTask.2
            @Override // java.util.Comparator
            public int compare(FileStatus fileStatus, FileStatus fileStatus2) {
                if (fileStatus.getLen() < fileStatus2.getLen()) {
                    return -1;
                }
                if (fileStatus.getLen() == fileStatus2.getLen()) {
                    return fileStatus.getPath().toString().equals(fileStatus2.getPath().toString()) ? 0 : -1;
                }
                return 1;
            }
        };
        this.mapOutputFilesOnDisk = new TreeSet(this.mapOutputFileComparator);
        this.numMaps = i2;
    }

    public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> map) {
        this.localMapFiles = map;
    }

    private CompressionCodec initCodec() {
        if (this.conf.getCompressMapOutput()) {
            return (CompressionCodec) ReflectionUtils.newInstance(this.conf.getMapOutputCompressorClass(DefaultCodec.class), this.conf);
        }
        return null;
    }

    @Override // org.apache.hadoop.mapred.Task
    public boolean isMapTask() {
        return false;
    }

    public int getNumMaps() {
        return this.numMaps;
    }

    @Override // org.apache.hadoop.mapred.Task
    public void localizeConfiguration(JobConf jobConf) throws IOException {
        super.localizeConfiguration(jobConf);
        jobConf.setNumMapTasks(this.numMaps);
    }

    @Override // org.apache.hadoop.mapred.Task
    public void write(DataOutput dataOutput) throws IOException {
        super.write(dataOutput);
        dataOutput.writeInt(this.numMaps);
    }

    @Override // org.apache.hadoop.mapred.Task
    public void readFields(DataInput dataInput) throws IOException {
        super.readFields(dataInput);
        this.numMaps = dataInput.readInt();
    }

    private Path[] getMapFiles(FileSystem fileSystem) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.numMaps; i++) {
            arrayList.add(this.mapOutputFile.getInputFile(i));
        }
        return (Path[]) arrayList.toArray(new Path[0]);
    }

    @Override // org.apache.hadoop.mapred.Task
    public void run(JobConf jobConf, TaskUmbilicalProtocol taskUmbilicalProtocol) throws IOException, InterruptedException, ClassNotFoundException {
        jobConf.setBoolean(MRJobConfig.SKIP_RECORDS, isSkipping());
        if (isMapOrReduce()) {
            this.copyPhase = getProgress().addPhase("copy");
            this.sortPhase = getProgress().addPhase("sort");
            this.reducePhase = getProgress().addPhase("reduce");
        }
        Task.TaskReporter startReporter = startReporter(taskUmbilicalProtocol);
        boolean useNewReducer = jobConf.getUseNewReducer();
        initialize(jobConf, getJobID(), startReporter, useNewReducer);
        if (this.jobCleanup) {
            runJobCleanupTask(taskUmbilicalProtocol, startReporter);
            return;
        }
        if (this.jobSetup) {
            runJobSetupTask(taskUmbilicalProtocol, startReporter);
            return;
        }
        if (this.taskCleanup) {
            runTaskCleanupTask(taskUmbilicalProtocol, startReporter);
            return;
        }
        this.codec = initCodec();
        Class<? extends Reducer> combinerClass = this.conf.getCombinerClass();
        Task.CombineOutputCollector combineOutputCollector = null != combinerClass ? new Task.CombineOutputCollector(this.reduceCombineOutputCounter, startReporter, this.conf) : null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = (ShuffleConsumerPlugin) ReflectionUtils.newInstance(jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class), jobConf);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        shuffleConsumerPlugin.init(new ShuffleConsumerPlugin.Context(getTaskID(), jobConf, FileSystem.getLocal(jobConf), taskUmbilicalProtocol, this.lDirAlloc, startReporter, this.codec, combinerClass, combineOutputCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles));
        RawKeyValueIterator run = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        setPhase(TaskStatus.Phase.REDUCE);
        statusUpdate(taskUmbilicalProtocol);
        Class<?> mapOutputKeyClass = jobConf.getMapOutputKeyClass();
        Class<?> mapOutputValueClass = jobConf.getMapOutputValueClass();
        RawComparator outputValueGroupingComparator = jobConf.getOutputValueGroupingComparator();
        if (useNewReducer) {
            runNewReducer(jobConf, taskUmbilicalProtocol, startReporter, run, outputValueGroupingComparator, mapOutputKeyClass, mapOutputValueClass);
        } else {
            runOldReducer(jobConf, taskUmbilicalProtocol, startReporter, run, outputValueGroupingComparator, mapOutputKeyClass, mapOutputValueClass);
        }
        shuffleConsumerPlugin.close();
        done(taskUmbilicalProtocol, startReporter);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldReducer(JobConf jobConf, TaskUmbilicalProtocol taskUmbilicalProtocol, final Task.TaskReporter taskReporter, RawKeyValueIterator rawKeyValueIterator, RawComparator<INKEY> rawComparator, Class<INKEY> cls, Class<INVALUE> cls2) throws IOException {
        Reducer reducer = (Reducer) ReflectionUtils.newInstance(jobConf.getReducerClass(), jobConf);
        final OldTrackingRecordWriter oldTrackingRecordWriter = new OldTrackingRecordWriter(this, jobConf, taskReporter, getOutputName(getPartition()));
        OutputCollector<OUTKEY, OUTVALUE> outputCollector = new OutputCollector<OUTKEY, OUTVALUE>() { // from class: org.apache.hadoop.mapred.ReduceTask.3
            @Override // org.apache.hadoop.mapred.OutputCollector
            public void collect(OUTKEY outkey, OUTVALUE outvalue) throws IOException {
                oldTrackingRecordWriter.write(outkey, outvalue);
                taskReporter.progress();
            }
        };
        try {
            boolean z = SkipBadRecords.getReducerMaxSkipGroups(jobConf) > 0 && SkipBadRecords.getAutoIncrReducerProcCount(jobConf);
            ReduceValuesIterator skippingReduceValuesIterator = isSkipping() ? new SkippingReduceValuesIterator(rawKeyValueIterator, rawComparator, cls, cls2, jobConf, taskReporter, taskUmbilicalProtocol) : new ReduceValuesIterator(rawKeyValueIterator, jobConf.getOutputValueGroupingComparator(), cls, cls2, jobConf, taskReporter);
            skippingReduceValuesIterator.informReduceProgress();
            while (skippingReduceValuesIterator.more()) {
                this.reduceInputKeyCounter.increment(1L);
                reducer.reduce(skippingReduceValuesIterator.getKey(), skippingReduceValuesIterator, outputCollector, taskReporter);
                if (z) {
                    taskReporter.incrCounter(SkipBadRecords.COUNTER_GROUP, SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1L);
                }
                skippingReduceValuesIterator.nextKey();
                skippingReduceValuesIterator.informReduceProgress();
            }
            reducer.close();
            reducer = null;
            oldTrackingRecordWriter.close(taskReporter);
            oldTrackingRecordWriter = null;
            IOUtils.cleanup(LOG, new Closeable[]{null});
            closeQuietly(null, taskReporter);
        } catch (Throwable th) {
            IOUtils.cleanup(LOG, new Closeable[]{reducer});
            closeQuietly(oldTrackingRecordWriter, taskReporter);
            throw th;
        }
    }

    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf jobConf, TaskUmbilicalProtocol taskUmbilicalProtocol, final Task.TaskReporter taskReporter, final RawKeyValueIterator rawKeyValueIterator, RawComparator<INKEY> rawComparator, Class<INKEY> cls, Class<INVALUE> cls2) throws IOException, InterruptedException, ClassNotFoundException {
        RawKeyValueIterator rawKeyValueIterator2 = new RawKeyValueIterator() { // from class: org.apache.hadoop.mapred.ReduceTask.4
            @Override // org.apache.hadoop.mapred.RawKeyValueIterator
            public void close() throws IOException {
                rawKeyValueIterator.close();
            }

            @Override // org.apache.hadoop.mapred.RawKeyValueIterator
            public DataInputBuffer getKey() throws IOException {
                return rawKeyValueIterator.getKey();
            }

            @Override // org.apache.hadoop.mapred.RawKeyValueIterator
            public Progress getProgress() {
                return rawKeyValueIterator.getProgress();
            }

            @Override // org.apache.hadoop.mapred.RawKeyValueIterator
            public DataInputBuffer getValue() throws IOException {
                return rawKeyValueIterator.getValue();
            }

            @Override // org.apache.hadoop.mapred.RawKeyValueIterator
            public boolean next() throws IOException {
                boolean next = rawKeyValueIterator.next();
                taskReporter.setProgress(rawKeyValueIterator.getProgress().getProgress());
                return next;
            }
        };
        org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl taskAttemptContextImpl = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(jobConf, getTaskID(), taskReporter);
        org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer) ReflectionUtils.newInstance(taskAttemptContextImpl.getReducerClass(), jobConf);
        NewTrackingRecordWriter newTrackingRecordWriter = new NewTrackingRecordWriter(this, taskAttemptContextImpl);
        jobConf.setBoolean("mapred.skip.on", isSkipping());
        jobConf.setBoolean(MRJobConfig.SKIP_RECORDS, isSkipping());
        org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext = createReduceContext(reducer, jobConf, getTaskID(), rawKeyValueIterator2, this.reduceInputKeyCounter, this.reduceInputValueCounter, newTrackingRecordWriter, this.committer, taskReporter, rawComparator, cls, cls2);
        try {
            reducer.run(createReduceContext);
            newTrackingRecordWriter.close(createReduceContext);
        } catch (Throwable th) {
            newTrackingRecordWriter.close(createReduceContext);
            throw th;
        }
    }

    private <OUTKEY, OUTVALUE> void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> recordWriter, Reporter reporter) {
        if (recordWriter != null) {
            try {
                recordWriter.close(reporter);
            } catch (Exception e) {
                LOG.info("Exception in closing " + recordWriter, e);
            }
        }
    }

    static {
        WritableFactories.setFactory(ReduceTask.class, new WritableFactory() { // from class: org.apache.hadoop.mapred.ReduceTask.1
            public Writable newInstance() {
                return new ReduceTask();
            }
        });
        LOG = LogFactory.getLog(ReduceTask.class.getName());
    }
}
