/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.DirectInMemoryOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectInMemoryReader;
import org.apache.hadoop.mapreduce.task.reduce.DirectOnDiskMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.DirectShuffleMergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManager;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.LimitedPrivate(value={"MapReduce"})
@InterfaceStability.Unstable
public class DirectShuffleMergeManagerImpl<K, V>
implements MergeManager<K, V> {
    private static final Log LOG = LogFactory.getLog(DirectShuffleMergeManagerImpl.class);
    private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT = 0.25f;
    private final TaskAttemptID reduceId;
    private final JobConf jobConf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    protected MapOutputFile mapOutputFile;
    Set<DirectInMemoryOutput<K, V>> inMemoryMergedMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());
    private IntermediateMemoryToMemoryMerger memToMemMerger;
    Set<DirectInMemoryOutput<K, V>> inMemoryMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());
    private final MergeThread<DirectInMemoryOutput<K, V>, K, V> inMemoryMerger;
    Set<FileStatus> onDiskMapOutputs = new TreeSet(new /* Unavailable Anonymous Inner Class!! */);
    private final OnDiskMerger onDiskMerger;
    private final long memoryLimit;
    private long usedMemory;
    private long commitMemory;
    private final long maxSingleShuffleLimit;
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final int ioSortFactor;
    private final Reporter reporter;
    private final ExceptionReporter exceptionReporter;
    private final Class<? extends Reducer> combinerClass;
    private final Task.CombineOutputCollector<K, V> combineCollector;
    private final Counters.Counter spilledRecordsCounter;
    private final Counters.Counter reduceCombineInputCounter;
    private final Counters.Counter mergedMapOutputsCounter;
    private final CompressionCodec codec;
    private final Progress mergePhase;

    public DirectShuffleMergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, Task.CombineOutputCollector<K, V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter mergedMapOutputsCounter, ExceptionReporter exceptionReporter, Progress mergePhase, MapOutputFile mapOutputFile) throws IOException {
        this.reduceId = reduceId;
        this.jobConf = jobConf;
        this.exceptionReporter = exceptionReporter;
        this.reporter = reporter;
        this.codec = codec;
        this.combinerClass = combinerClass;
        this.combineCollector = combineCollector;
        this.reduceCombineInputCounter = reduceCombineInputCounter;
        this.spilledRecordsCounter = spilledRecordsCounter;
        this.mergedMapOutputsCounter = mergedMapOutputsCounter;
        this.mapOutputFile = mapOutputFile;
        this.mapOutputFile.setConf((Configuration)jobConf);
        this.rfs = this.localFS = FileSystem.get((Configuration)jobConf);
        float maxInMemCopyUse = jobConf.getFloat("mapreduce.reduce.shuffle.input.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.input.buffer.percent: " + maxInMemCopyUse);
        }
        this.memoryLimit = (long)((float)jobConf.getLong("mapreduce.reduce.memory.totalbytes", Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
        this.ioSortFactor = jobConf.getInt("mapreduce.task.io.sort.factor", 100);
        float singleShuffleMemoryLimitPercent = jobConf.getFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.25f);
        if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.memory.limit.percent: " + singleShuffleMemoryLimitPercent);
        }
        this.usedMemory = 0L;
        this.commitMemory = 0L;
        this.maxSingleShuffleLimit = (long)((float)this.memoryLimit * singleShuffleMemoryLimitPercent);
        this.memToMemMergeOutputsThreshold = jobConf.getInt("mapreduce.reduce.merge.memtomem.threshold", this.ioSortFactor);
        this.mergeThreshold = (long)((float)this.memoryLimit * jobConf.getFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f));
        LOG.info((Object)("MergerManager: memoryLimit=" + this.memoryLimit + ", " + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", " + "mergeThreshold=" + this.mergeThreshold + ", " + "ioSortFactor=" + this.ioSortFactor + ", " + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold));
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invlaid configuration: maxSingleShuffleLimit should be less than mergeThresholdmaxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold);
        }
        boolean allowMemToMemMerge = jobConf.getBoolean("mapreduce.reduce.merge.memtomem.enabled", false);
        if (allowMemToMemMerge) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this, this.memToMemMergeOutputsThreshold);
            this.memToMemMerger.start();
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = this.createInMemoryMerger();
        this.inMemoryMerger.start();
        this.onDiskMerger = new OnDiskMerger(this, this);
        this.onDiskMerger.start();
        this.mergePhase = mergePhase;
    }

    protected MergeThread<DirectInMemoryOutput<K, V>, K, V> createInMemoryMerger() {
        return new InMemoryMerger(this, this);
    }

    TaskAttemptID getReduceId() {
        return this.reduceId;
    }

    @VisibleForTesting
    ExceptionReporter getExceptionReporter() {
        return this.exceptionReporter;
    }

    public void waitForResource() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
    }

    @VisibleForTesting
    protected boolean canShuffleToMemory(long requestedSize) {
        return requestedSize < this.maxSingleShuffleLimit;
    }

    public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher) throws IOException {
        if (!this.canShuffleToMemory(requestedSize)) {
            LOG.info((Object)(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")"));
            return new DirectOnDiskMapOutput(mapId, this.reduceId, this, requestedSize, this.jobConf, this.mapOutputFile, fetcher, true);
        }
        while (this.usedMemory > this.memoryLimit) {
            LOG.debug((Object)(mapId + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + ")." + " CommitMemory is (" + this.commitMemory + ")"));
            try {
                LOG.info((Object)("fetcher#" + fetcher + " - MergeManager returned status WAIT ..."));
                this.wait();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        LOG.debug((Object)(mapId + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ")." + "CommitMemory is (" + this.commitMemory + ")"));
        return this.unconditionalReserve(mapId, requestedSize, true);
    }

    @VisibleForTesting
    protected synchronized DirectInMemoryOutput<K, V> unconditionalReserve(TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
        this.usedMemory += requestedSize;
        return new DirectInMemoryOutput((Configuration)this.jobConf, mapId, this, (int)requestedSize, this.codec, primaryMapOutput);
    }

    synchronized void unreserve(long size) {
        this.usedMemory -= size;
        this.notifyAll();
    }

    public synchronized void closeInMemoryFile(DirectInMemoryOutput<K, V> mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory));
        this.commitMemory += mapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            LOG.info((Object)("Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory));
            this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
            this.inMemoryMergedMapOutputs.clear();
            this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            this.commitMemory = 0L;
        }
        if (this.memToMemMerger != null && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
            this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
        }
    }

    public synchronized void closeInMemoryMergedFile(DirectInMemoryOutput<K, V> mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        LOG.info((Object)("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size()));
    }

    public synchronized void closeOnDiskFile(FileStatus fileStatus) {
        this.onDiskMapOutputs.add(fileStatus);
        if (this.onDiskMapOutputs.size() >= 2 * this.ioSortFactor - 1) {
            this.onDiskMerger.startMerge(this.onDiskMapOutputs);
        }
    }

    public RawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList memory = new ArrayList(this.inMemoryMergedMapOutputs);
        this.inMemoryMergedMapOutputs.clear();
        memory.addAll(this.inMemoryMapOutputs);
        this.inMemoryMapOutputs.clear();
        ArrayList disk = new ArrayList(this.onDiskMapOutputs);
        this.onDiskMapOutputs.clear();
        return this.finalMerge(this.jobConf, this.rfs, memory, disk);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void combineAndSpill(RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException {
        JobConf job = this.jobConf;
        Reducer combiner = (Reducer)ReflectionUtils.newInstance((Class)this.combinerClass, (Configuration)job);
        Class keyClass = job.getMapOutputKeyClass();
        Class valClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputKeyComparator();
        try {
            Task.CombineValuesIterator values = new Task.CombineValuesIterator(kvIter, comparator, keyClass, valClass, (Configuration)job, Reporter.NULL, inCounter);
            while (values.more()) {
                combiner.reduce(values.getKey(), (Iterator)values, (OutputCollector)this.combineCollector, Reporter.NULL);
                values.nextKey();
            }
        }
        finally {
            combiner.close();
        }
    }

    private long createInMemorySegments(List<DirectInMemoryOutput<K, V>> inMemoryMapOutputs, List<Merger.Segment<K, V>> inMemorySegments, long leaveBytes) throws IOException {
        long totalSize = 0L;
        long fullSize = 0L;
        for (DirectInMemoryOutput<K, V> mo : inMemoryMapOutputs) {
            fullSize += (long)mo.getMemory().length;
        }
        while (fullSize > leaveBytes) {
            DirectInMemoryOutput<K, V> mo = inMemoryMapOutputs.remove(0);
            byte[] data = mo.getMemory();
            long size = data.length;
            totalSize += size;
            fullSize -= size;
            DirectInMemoryReader reader = new DirectInMemoryReader(this, mo.getMapId(), data, 0, (int)size, (Configuration)this.jobConf);
            inMemorySegments.add(new Merger.Segment((IFile.Reader)reader, true, mo.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return totalSize;
    }

    private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List<DirectInMemoryOutput<K, V>> inMemoryMapOutputs, List<FileStatus> onDiskMapOutputs) throws IOException {
        FileStatus[] onDisk;
        LOG.info((Object)("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs"));
        float maxRedPer = job.getFloat("mapreduce.reduce.input.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new IOException("mapreduce.reduce.input.buffer.percent" + maxRedPer);
        }
        int maxInMemReduce = (int)Math.min((float)Runtime.getRuntime().maxMemory() * maxRedPer, 2.1474836E9f);
        Class keyClass = job.getMapOutputKeyClass();
        Class valueClass = job.getMapOutputValueClass();
        boolean keepInputs = job.getKeepFailedTaskFiles();
        Path tmpDir = ((MapRFsOutputFile)this.mapOutputFile).getLocalPathForWrite(this.reduceId.getTaskID().toString(), -1L);
        RawComparator comparator = job.getOutputKeyComparator();
        ArrayList memDiskSegments = new ArrayList();
        long inMemToDiskBytes = 0L;
        boolean mergePhaseFinished = false;
        if (inMemoryMapOutputs.size() > 0) {
            TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
            inMemToDiskBytes = this.createInMemorySegments(inMemoryMapOutputs, memDiskSegments, (long)maxInMemReduce);
            int numMemDiskSegments = memDiskSegments.size();
            if (numMemDiskSegments > 0 && this.ioSortFactor > onDiskMapOutputs.size()) {
                mergePhaseFinished = true;
                Path outputPath = this.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);
                RawKeyValueIterator rIter = Merger.merge((Configuration)job, (FileSystem)fs, (Class)keyClass, (Class)valueClass, memDiskSegments, (int)numMemDiskSegments, (Path)tmpDir, (RawComparator)comparator, (Progressable)this.reporter, (Counters.Counter)this.spilledRecordsCounter, null, (Progress)this.mergePhase);
                IFile.Writer writer = new IFile.Writer((Configuration)job, fs.create(outputPath), keyClass, valueClass, this.codec, null, true);
                try {
                    Merger.writeFile((RawKeyValueIterator)rIter, (IFile.Writer)writer, (Progressable)this.reporter, (Configuration)job);
                    writer.close();
                    onDiskMapOutputs.add(fs.getFileStatus(outputPath));
                    writer = null;
                }
                catch (IOException e) {
                    if (null != outputPath) {
                        try {
                            fs.delete(outputPath, true);
                        }
                        catch (IOException ie) {
                            // empty catch block
                        }
                    }
                    throw e;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
                LOG.info((Object)("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + "reduce memory limit"));
                inMemToDiskBytes = 0L;
                memDiskSegments.clear();
            } else if (inMemToDiskBytes != 0L) {
                LOG.info((Object)("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge"));
            }
        }
        ArrayList<Object> diskSegments = new ArrayList<Object>();
        long onDiskBytes = inMemToDiskBytes;
        long rawBytes = inMemToDiskBytes;
        for (FileStatus file : onDisk = onDiskMapOutputs.toArray(new FileStatus[onDiskMapOutputs.size()])) {
            long fileLength = file.getLen();
            onDiskBytes += fileLength;
            rawBytes += fileLength;
            LOG.debug((Object)("Disk file: " + file + " Length is " + fileLength));
            diskSegments.add(new Merger.Segment((Configuration)job, fs, file.getPath(), this.codec, keepInputs, file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter, fileLength));
        }
        LOG.info((Object)("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk"));
        Collections.sort(diskSegments, new /* Unavailable Anonymous Inner Class!! */);
        ArrayList<Merger.Segment> finalSegments = new ArrayList<Merger.Segment>();
        long inMemBytes = this.createInMemorySegments(inMemoryMapOutputs, finalSegments, 0L);
        LOG.info((Object)("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce"));
        if (0L != onDiskBytes) {
            int numInMemSegments = memDiskSegments.size();
            diskSegments.addAll(0, memDiskSegments);
            memDiskSegments.clear();
            Progress thisPhase = mergePhaseFinished ? null : this.mergePhase;
            RawKeyValueIterator diskMerge = Merger.merge((Configuration)job, (FileSystem)fs, (Class)keyClass, (Class)valueClass, (CompressionCodec)this.codec, diskSegments, (int)this.ioSortFactor, (int)numInMemSegments, (Path)tmpDir, (RawComparator)comparator, (Progressable)this.reporter, (boolean)false, (Counters.Counter)this.spilledRecordsCounter, null, (Progress)thisPhase);
            diskSegments.clear();
            if (0 == finalSegments.size()) {
                return diskMerge;
            }
            finalSegments.add(new Merger.Segment((IFile.Reader)new RawKVIteratorReader(this, diskMerge, onDiskBytes), true, rawBytes));
        }
        return Merger.merge((Configuration)job, (FileSystem)fs, (Class)keyClass, (Class)valueClass, finalSegments, (int)finalSegments.size(), (Path)tmpDir, (RawComparator)comparator, (Progressable)this.reporter, (Counters.Counter)this.spilledRecordsCounter, null, null);
    }

    static /* synthetic */ ExceptionReporter access$000(DirectShuffleMergeManagerImpl x0) {
        return x0.exceptionReporter;
    }

    static /* synthetic */ long access$100(DirectShuffleMergeManagerImpl x0, List x1, List x2, long x3) throws IOException {
        return x0.createInMemorySegments(x1, x2, x3);
    }

    static /* synthetic */ Log access$200() {
        return LOG;
    }

    static /* synthetic */ JobConf access$300(DirectShuffleMergeManagerImpl x0) {
        return x0.jobConf;
    }

    static /* synthetic */ FileSystem access$400(DirectShuffleMergeManagerImpl x0) {
        return x0.rfs;
    }

    static /* synthetic */ TaskAttemptID access$500(DirectShuffleMergeManagerImpl x0) {
        return x0.reduceId;
    }

    static /* synthetic */ Reporter access$600(DirectShuffleMergeManagerImpl x0) {
        return x0.reporter;
    }

    static /* synthetic */ CompressionCodec access$700(DirectShuffleMergeManagerImpl x0) {
        return x0.codec;
    }

    static /* synthetic */ Counters.Counter access$800(DirectShuffleMergeManagerImpl x0) {
        return x0.spilledRecordsCounter;
    }

    static /* synthetic */ Class access$900(DirectShuffleMergeManagerImpl x0) {
        return x0.combinerClass;
    }

    static /* synthetic */ Task.CombineOutputCollector access$1000(DirectShuffleMergeManagerImpl x0) {
        return x0.combineCollector;
    }

    static /* synthetic */ Counters.Counter access$1100(DirectShuffleMergeManagerImpl x0) {
        return x0.reduceCombineInputCounter;
    }

    static /* synthetic */ void access$1200(DirectShuffleMergeManagerImpl x0, RawKeyValueIterator x1, Counters.Counter x2) throws IOException {
        x0.combineAndSpill(x1, x2);
    }

    static /* synthetic */ FileSystem access$1300(DirectShuffleMergeManagerImpl x0) {
        return x0.localFS;
    }

    static /* synthetic */ int access$1400(DirectShuffleMergeManagerImpl x0) {
        return x0.ioSortFactor;
    }

    static /* synthetic */ Counters.Counter access$1500(DirectShuffleMergeManagerImpl x0) {
        return x0.mergedMapOutputsCounter;
    }
}

