package org.apache.hadoop.mapreduce.task.reduce;

import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapRFsOutputFile;
import org.apache.hadoop.mapred.MapRIFileInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-mapreduce-client-contrib-2.3.0-mapr-4.0.0-beta.jar:org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher.class
 */
/* loaded from: input_file:classes/org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher.class */
public class DirectShuffleFetcher<K, V> extends Thread {
    private static final Log LOG = LogFactory.getLog(DirectShuffleFetcher.class);
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final Counters.Counter ioErrs;
    private final TaskAttemptID reduceId;
    private MapRFsOutputFile mapOutputFile;
    private int id;
    private Reporter reporter;
    private int prefetchBytesHint;
    private CompressionCodec codec;
    private Decompressor decompressor;
    private Configuration jobConf;
    private final MergeManager<K, V> merger;
    protected FileSystem rfs;
    private final ExceptionReporter exceptionReporter;
    private final ShuffleClientMetrics metrics;
    private DirectShuffleSchedulerImpl<K, V> scheduler;
    private static final int OBSOLETE = -2;
    private MapOutputLocation currentLocation = null;
    private volatile boolean shouldExit = false;
    private boolean useDirectReduce = false;

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-contrib-2.3.0-mapr-4.0.0-beta.jar:org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher$CopyOutputErrorType.class
     */
    /* loaded from: input_file:classes/org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher$CopyOutputErrorType.class */
    private enum CopyOutputErrorType {
        NO_ERROR,
        READ_ERROR,
        OTHER_ERROR
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-contrib-2.3.0-mapr-4.0.0-beta.jar:org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher$ShuffleErrors.class
     */
    /* loaded from: input_file:classes/org/apache/hadoop/mapreduce/task/reduce/DirectShuffleFetcher$ShuffleErrors.class */
    private enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE
    }

    public DirectShuffleFetcher(int i, JobConf jobConf, TaskAttemptID taskAttemptID, DirectShuffleSchedulerImpl<K, V> directShuffleSchedulerImpl, MergeManager<K, V> mergeManager, Reporter reporter, ShuffleClientMetrics shuffleClientMetrics, ExceptionReporter exceptionReporter, MapOutputFile mapOutputFile) {
        this.codec = null;
        this.decompressor = null;
        this.id = i;
        setName("MapOutputCopier " + taskAttemptID.getTaskID() + "." + i);
        LOG.debug(getName() + " created");
        this.reporter = reporter;
        this.ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.exceptionReporter = exceptionReporter;
        this.reduceId = taskAttemptID;
        this.mapOutputFile = (MapRFsOutputFile) mapOutputFile;
        this.metrics = shuffleClientMetrics;
        this.jobConf = jobConf;
        this.merger = mergeManager;
        try {
            this.rfs = FileSystem.get(jobConf);
        } catch (IOException e) {
            LOG.error("Unable to init filesystem", e);
        }
        this.scheduler = directShuffleSchedulerImpl;
        this.prefetchBytesHint = jobConf.getInt("maprfs.openfid2.prefetch.bytes", 0);
        if (jobConf.getCompressMapOutput()) {
            this.codec = (CompressionCodec) ReflectionUtils.newInstance(jobConf.getMapOutputCompressorClass(DefaultCodec.class), jobConf);
            this.decompressor = CodecPool.getDecompressor(this.codec);
        }
    }

    public synchronized boolean fail() {
        if (this.currentLocation == null) {
            return false;
        }
        finish(-1L, CopyOutputErrorType.OTHER_ERROR);
        return true;
    }

    public synchronized MapOutputLocation getLocation() {
        return this.currentLocation;
    }

    private synchronized void finish(long j, CopyOutputErrorType copyOutputErrorType) {
        if (this.currentLocation != null) {
            LOG.debug(getName() + " finishing " + this.currentLocation + " =" + j);
            this.currentLocation = null;
        }
    }

    public void shutDown() {
        this.shouldExit = true;
        interrupt();
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                if (this.shouldExit) {
                    break;
                }
                MapOutputLocation mapOutputLocation = null;
                try {
                    try {
                        this.merger.waitForResource();
                        mapOutputLocation = this.scheduler.getLocation();
                        this.metrics.threadBusy();
                        copyOutput(mapOutputLocation);
                        if (mapOutputLocation != null) {
                            this.metrics.threadFree();
                        }
                    } catch (Throwable th) {
                        if (mapOutputLocation != null) {
                            this.metrics.threadFree();
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    LOG.warn(this.reduceId.getTaskID() + " copy failed: " + mapOutputLocation.getTaskAttemptId() + " from " + mapOutputLocation.getHost(), e);
                    this.metrics.failedFetch();
                    if (mapOutputLocation != null) {
                        this.metrics.threadFree();
                    }
                } catch (InterruptedException e2) {
                    if (!this.shouldExit) {
                        LOG.warn("Unexpected InterruptedException");
                        if (mapOutputLocation != null) {
                            this.metrics.threadFree();
                        }
                    } else if (mapOutputLocation != null) {
                        this.metrics.threadFree();
                    }
                }
            } catch (Throwable th2) {
                this.exceptionReporter.reportException(th2);
            }
        }
        if (this.decompressor != null) {
            CodecPool.returnDecompressor(this.decompressor);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public long copyOutput(MapOutputLocation mapOutputLocation) throws IOException, InterruptedException {
        if (mapOutputLocation == null) {
            return -2L;
        }
        MapOutput<K, V> mapOutput = null;
        if (!this.useDirectReduce) {
            mapOutput = getMapOutputFromFile(mapOutputLocation, new Path(this.mapOutputFile.getInputFile(mapOutputLocation.getTaskId().getId(), org.apache.hadoop.mapred.TaskAttemptID.downgrade(this.reduceId)) + "-" + this.id), this.reduceId.getTaskID().getId());
        }
        if (mapOutput == null) {
            this.scheduler.copyFailed(mapOutputLocation.getTaskAttemptId(), mapOutputLocation);
            this.scheduler.addKnownMapOutput(mapOutputLocation.getHost(), mapOutputLocation.getTaskAttemptId(), mapOutputLocation.getShuffleRootFid());
            return -1L;
        }
        long size = mapOutput.getSize();
        if (size == 0) {
            mapOutput.abort();
        }
        return size;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private MapOutput<K, V> getMapOutputFromFile(MapOutputLocation mapOutputLocation, Path path, int i) {
        MapOutput mapOutput = null;
        String str = mapOutputLocation.shuffleRootFid.getFid() + "/" + this.mapOutputFile.getRelOutputFile(org.apache.hadoop.mapred.TaskAttemptID.downgrade(mapOutputLocation.getTaskAttemptId()), i);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                long currentTimeMillis2 = LOG.isDebugEnabled() ? System.currentTimeMillis() : 0L;
                FSDataInputStream openFid2 = this.rfs.openFid2(mapOutputLocation.shuffleRootFid, this.mapOutputFile.getRelOutputFile(org.apache.hadoop.mapred.TaskAttemptID.downgrade(mapOutputLocation.getTaskAttemptId()), i), this.prefetchBytesHint);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("MapR-DBG: openFid2 " + openFid2 + " took " + (System.currentTimeMillis() - currentTimeMillis2));
                }
                long fileLength = openFid2.getFileLength();
                MapRIFileInputStream mapRIFileInputStream = new MapRIFileInputStream(openFid2, fileLength, this.jobConf);
                long mapOutputSize = mapRIFileInputStream.getMapOutputFileInfo().getMapOutputSize();
                long fileBytesWritten = mapRIFileInputStream.getMapOutputFileInfo().getFileBytesWritten();
                try {
                    MapOutput<K, V> reserve = this.merger.reserve(org.apache.hadoop.mapred.TaskAttemptID.downgrade(mapOutputLocation.getTaskAttemptId()), mapOutputSize, this.id);
                    if (reserve == null) {
                        LOG.error("mapOutput can not be returned as null from reserve");
                        if (openFid2 != null) {
                            IOUtils.cleanup(LOG, new Closeable[]{openFid2});
                        }
                        return null;
                    }
                    try {
                        LOG.info("fetcher#" + this.id + " about to shuffle output of map " + reserve.getMapId() + " to " + reserve.getDescription());
                        long j = mapOutputSize;
                        if (reserve instanceof DirectOnDiskMapOutput) {
                            j = fileLength;
                        }
                        reserve.shuffle(new MapHost(mapOutputLocation.getHost(), ""), openFid2, j, fileBytesWritten, this.metrics, this.reporter);
                        this.scheduler.copySucceeded(reserve.getMapId(), mapOutputLocation, reserve.getSize(), System.currentTimeMillis() - currentTimeMillis, reserve);
                        this.metrics.successFetch();
                        long currentTimeMillis3 = LOG.isDebugEnabled() ? System.currentTimeMillis() : 0L;
                        openFid2.close();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("MapR-DBG: close " + openFid2 + " took " + (System.currentTimeMillis() - currentTimeMillis3));
                        }
                        if (0 != 0) {
                            IOUtils.cleanup(LOG, new Closeable[]{null});
                        }
                        return reserve;
                    } catch (InternalError e) {
                        LOG.warn("Failed to shuffle for fetcher#" + this.id, e);
                        throw new IOException(e);
                    }
                } catch (IOException e2) {
                    this.ioErrs.increment(1L);
                    this.scheduler.reportLocalError(e2);
                    if (openFid2 != null) {
                        IOUtils.cleanup(LOG, new Closeable[]{openFid2});
                    }
                    return null;
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    IOUtils.cleanup(LOG, new Closeable[]{null});
                }
                throw th;
            }
        } catch (FileNotFoundException e3) {
            LOG.error("FileNotFoundException reading map output of task " + mapOutputLocation.getTaskAttemptId() + " for reduce " + i + " file path " + str, e3);
            if (0 != 0) {
                IOUtils.cleanup(LOG, new Closeable[]{null});
            }
            return null;
        } catch (IOException e4) {
            LOG.error("IOException reading map output of task " + mapOutputLocation.getTaskAttemptId() + " for reduce " + i + " file path " + str, e4);
            this.ioErrs.increment(1L);
            if (0 == 0) {
                if (0 != 0) {
                    IOUtils.cleanup(LOG, new Closeable[]{null});
                }
                return null;
            }
            mapOutput.abort();
            this.metrics.failedFetch();
            if (0 != 0) {
                IOUtils.cleanup(LOG, new Closeable[]{null});
            }
            return null;
        }
    }
}
