/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;

public class Fetcher
implements Callable<FetchResult> {
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
    private final Configuration conf;
    private CompressionCodec codec;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = 0x400000;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final String logIdentifier;
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private LinkedHashSet<InputAttemptIdentifier> remaining;
    private URL url;
    private volatile DataInputStream input;
    private HttpConnection httpConnection;
    private HttpConnection.HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final LocalDirAllocator localDirAllocator;
    private final Path lockPath;
    private final RawLocalFileSystem localFs;
    private long retryStartTime = 0L;
    private final boolean isDebugEnabled = LOG.isDebugEnabled();

    private Fetcher(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled) {
        this.fetcherCallback = fetcherCallback;
        this.inputManager = inputManager;
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.appId = appId;
        this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
        this.httpConnectionParams = params;
        this.conf = conf;
        this.localDiskFetchEnabled = localDiskFetchEnabled;
        this.sharedFetchEnabled = sharedFetchEnabled;
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
        this.logIdentifier = " fetcher [" + srcNameTrimmed + "] " + this.fetcherIdentifier;
        this.localFs = localFs;
        this.localDirAllocator = localDirAllocator;
        this.lockPath = lockPath;
        try {
            if (this.sharedFetchEnabled) {
                this.localFs.mkdirs(this.lockPath);
            }
        }
        catch (Exception e) {
            LOG.warn((Object)("Error initializing local dirs for shared transfer " + e));
        }
    }

    @Override
    public FetchResult call() throws Exception {
        boolean multiplex;
        boolean bl = multiplex = this.sharedFetchEnabled && this.localDiskFetchEnabled;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        for (InputAttemptIdentifier in : this.srcAttempts) {
            this.pathToAttemptMap.put(in.getPathComponent(), in);
            multiplex &= in.isShared();
        }
        if (multiplex) {
            Preconditions.checkArgument((this.partition == 0 ? 1 : 0) != 0, (String)"Shared fetches cannot be done for partitioned input- partition is non-zero (%d)", (Object[])new Object[]{this.partition});
        }
        this.remaining = new LinkedHashSet<InputAttemptIdentifier>(this.srcAttempts);
        HostFetchResult hostFetchResult = this.localDiskFetchEnabled && this.host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString())) ? this.setupLocalDiskFetch() : (multiplex ? this.doSharedFetch() : this.doHttpFetch());
        if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
            if (!this.isShutDown.get()) {
                LOG.warn((Object)("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs)));
                for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
                    this.fetcherCallback.fetchFailed(this.host, left, hostFetchResult.connectFailed);
                }
            } else {
                LOG.info((Object)("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length + " inputs since the fetcher has already been stopped"));
            }
        }
        this.shutdown();
        if (hostFetchResult.failedInputs == null && !this.remaining.isEmpty()) {
            if (!multiplex) {
                throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
            }
            LOG.info((Object)("Shared fetch failed to return " + this.remaining.size() + " inputs on this try"));
        }
        return hostFetchResult.fetchResult;
    }

    private int findInputs() throws IOException {
        int k = 0;
        for (InputAttemptIdentifier src : this.srcAttempts) {
            try {
                if (this.getShuffleInputFileName(src.getPathComponent(), ".index") == null) continue;
                ++k;
            }
            catch (DiskChecker.DiskErrorException de) {}
        }
        return k;
    }

    private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
        File lockFile = this.localFs.pathToFile(new Path(this.lockPath, this.host + ".lock"));
        boolean created = lockFile.createNewFile();
        if (!created && !lockFile.exists()) {
            return null;
        }
        FileChannel lockChannel = new RandomAccessFile(lockFile, "rws").getChannel();
        FileLock xlock = null;
        xlock = lockChannel.tryLock(0L, Long.MAX_VALUE, false);
        if (xlock != null) {
            return xlock;
        }
        lockChannel.close();
        return null;
    }

    private void releaseLock(FileLock lock) throws IOException {
        if (lock != null && lock.isValid()) {
            FileChannel lockChannel = lock.channel();
            lock.release();
            lockChannel.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected HostFetchResult doSharedFetch() throws IOException {
        int inputs = this.findInputs();
        if (inputs == this.srcAttempts.size()) {
            if (this.isDebugEnabled) {
                LOG.debug((Object)"Using the copies found locally");
            }
            return this.doLocalDiskFetch(true);
        }
        if (inputs > 0) {
            if (this.isDebugEnabled) {
                LOG.debug((Object)("Found " + this.input + " local fetches right now, using them first"));
            }
            return this.doLocalDiskFetch(false);
        }
        FileLock lock = null;
        try {
            lock = this.getLock();
            if (lock == null) {
                LOG.info((Object)("Requeuing " + this.host + ":" + this.port + " downloads because we didn't get a lock"));
                HostFetchResult hostFetchResult = new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
                return hostFetchResult;
            }
            if (this.findInputs() == this.srcAttempts.size()) {
                this.releaseLock(lock);
                lock = null;
                HostFetchResult hostFetchResult = this.doLocalDiskFetch(true);
                return hostFetchResult;
            }
            HostFetchResult hostFetchResult = this.doHttpFetch(new CachingCallBack());
            return hostFetchResult;
        }
        catch (OverlappingFileLockException jvmCrossLock) {
            LOG.warn((Object)("Double locking detected for " + this.host));
        }
        catch (InterruptedException sleepInterrupted) {
            LOG.warn((Object)("Lock was interrupted for " + this.host));
        }
        finally {
            this.releaseLock(lock);
        }
        if (this.isShutDown.get()) {
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        return this.doHttpFetch();
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        return this.doHttpFetch(null);
    }

    private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
        try {
            StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.appId.toString(), this.httpConnectionParams.isSSLShuffleEnabled());
            this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, this.httpConnectionParams.getKeepAlive());
            this.httpConnection = new HttpConnection(this.url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretMgr);
            this.httpConnection.connect();
        }
        catch (IOException e) {
            InputAttemptIdentifier[] failedFetches = null;
            if (this.isShutDown.get()) {
                LOG.info((Object)("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage()));
            } else {
                failedFetches = this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedFetches, true);
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            LOG.info((Object)"Detected fetcher has been shutdown after connection establishment. Returning");
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        try {
            this.input = this.httpConnection.getInputStream();
            this.httpConnection.validate();
        }
        catch (IOException e) {
            if (this.isShutDown.get()) {
                LOG.info((Object)("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage()));
            }
            InputAttemptIdentifier firstAttempt = attempts.get(0);
            LOG.warn((Object)("Fetch Failure from host while connecting: " + this.host + ", attempt: " + firstAttempt + " Informing ShuffleManager: "), (Throwable)e);
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), new InputAttemptIdentifier[]{firstAttempt}, false);
        }
        return null;
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch(CachingCallBack callback) {
        HostFetchResult connectionsWithRetryResult = this.setupConnection(this.srcAttempts);
        if (connectionsWithRetryResult != null) {
            return connectionsWithRetryResult;
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            LOG.info((Object)"Detected fetcher has been shutdown after opening stream. Returning");
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        InputAttemptIdentifier[] failedInputs = null;
        while (!this.remaining.isEmpty() && failedInputs == null) {
            if (this.isShutDown.get()) {
                this.shutdownInternal(true);
                LOG.info((Object)("Fetcher already shutdown. Aborting queued fetches for " + this.remaining.size() + " inputs"));
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
            }
            try {
                failedInputs = this.fetchInputs(this.input, callback);
            }
            catch (FetcherReadTimeoutException e) {
                this.shutdownInternal(true);
                if (this.isShutDown.get()) {
                    LOG.info((Object)("Fetcher already shutdown. Aborting reconnection and queued fetches for " + this.remaining.size() + " inputs"));
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
                }
                connectionsWithRetryResult = this.setupConnection(new LinkedList<InputAttemptIdentifier>(this.remaining));
                if (connectionsWithRetryResult == null) continue;
                break;
            }
        }
        if (this.isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
            LOG.info((Object)("Fetcher already shutdown. Not reporting fetch failures for: " + (failedInputs == null ? 0 : failedInputs.length) + " failed inputs"));
            failedInputs = null;
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedInputs, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        return this.doLocalDiskFetch(true);
    }

    @VisibleForTesting
    private HostFetchResult doLocalDiskFetch(boolean failMissing) {
        Iterator iterator = this.remaining.iterator();
        while (iterator.hasNext()) {
            if (this.isShutDown.get()) {
                LOG.info((Object)("Already shutdown. Skipping fetch for " + this.remaining.size() + " inputs"));
                break;
            }
            InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier)iterator.next();
            long startTime = System.currentTimeMillis();
            LocalDiskFetchedInput fetchedInput = null;
            try {
                TezIndexRecord idxRecord = this.getTezIndexRecord(srcAttemptId);
                fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, this.getShuffleInputFileName(srcAttemptId.getPathComponent(), null), this.conf, new FetchedInputCallback(){

                    @Override
                    public void fetchComplete(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void fetchFailed(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void freeResources(FetchedInput fetchedInput) {
                    }
                });
                LOG.info((Object)("fetcher about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + " to " + (Object)((Object)fetchedInput.getType())));
                long endTime = System.currentTimeMillis();
                this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), idxRecord.getRawLength(), endTime - startTime);
                iterator.remove();
            }
            catch (IOException e) {
                this.cleanupFetchedInput(fetchedInput);
                if (this.isShutDown.get()) {
                    LOG.info((Object)("Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " + this.host + " : " + e.getClass().getName() + ", message=" + e.getMessage()));
                    break;
                }
                LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + this.host + "(local fetch)"), (Throwable)e);
            }
        }
        InputAttemptIdentifier[] failedFetches = null;
        if (failMissing && this.remaining.size() > 0) {
            if (this.isShutDown.get()) {
                LOG.info((Object)("Already shutdown, not reporting fetch failures for: " + this.remaining.size() + " remaining inputs"));
            } else {
                failedFetches = this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), failedFetches, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws IOException {
        Path indexFile = this.getShuffleInputFileName(srcAttemptId.getPathComponent(), ".index");
        TezSpillRecord spillRecord = new TezSpillRecord(indexFile, this.conf);
        TezIndexRecord idxRecord = spillRecord.getIndex(this.partition);
        return idxRecord;
    }

    private static final String getMapOutputFile(String pathComponent) {
        return "output/" + pathComponent + "/" + "file.out";
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
        suffix = suffix != null ? suffix : "";
        String pathFromLocalDir = Fetcher.getMapOutputFile(pathComponent) + suffix;
        return this.localDirAllocator.getLocalPathToRead(pathFromLocalDir, this.conf);
    }

    public void shutdown() {
        if (!this.isShutDown.getAndSet(true)) {
            this.shutdownInternal();
        }
    }

    private void shutdownInternal() {
        this.shutdownInternal(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownInternal(boolean disconnect) {
        AtomicBoolean atomicBoolean = this.isShutDown;
        synchronized (atomicBoolean) {
            block6: {
                try {
                    if (this.httpConnection != null) {
                        this.httpConnection.cleanup(disconnect);
                    }
                }
                catch (IOException e) {
                    LOG.info((Object)("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage()));
                    if (!LOG.isDebugEnabled()) break block6;
                    LOG.debug((Object)e);
                }
            }
        }
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream input, CachingCallBack callback) throws FetcherReadTimeoutException {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int responsePartition = -1;
            String pathComponent = null;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                pathComponent = header.getMapId();
                srcAttemptId = this.pathToAttemptMap.get(pathComponent);
                compressedLength = header.getCompressedLength();
                decompressedLength = header.getUncompressedLength();
                responsePartition = header.getPartition();
            }
            catch (IllegalArgumentException e) {
                if (!this.isShutDown.get()) {
                    LOG.warn((Object)"Invalid src id ", (Throwable)e);
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                LOG.info((Object)("Already shutdown. Ignoring badId error with message: " + e.getMessage()));
                return null;
            }
            if (!this.verifySanity(compressedLength, decompressedLength, responsePartition, srcAttemptId, pathComponent)) {
                if (!this.isShutDown.get()) {
                    if (srcAttemptId == null) {
                        LOG.warn((Object)("Was expecting " + this.getNextRemainingAttempt() + " but got null"));
                        srcAttemptId = this.getNextRemainingAttempt();
                    }
                    assert (srcAttemptId != null);
                    return new InputAttemptIdentifier[]{srcAttemptId};
                }
                LOG.info((Object)"Already shutdown. Ignoring verification failure.");
                return null;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength));
            }
            fetchedInput = srcAttemptId.isShared() && callback != null ? this.inputManager.allocateType(FetchedInput.Type.DISK, decompressedLength, compressedLength, srcAttemptId) : this.inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
            LOG.info((Object)("fetcher about to shuffle output of srcAttempt " + fetchedInput.getInputAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)fetchedInput.getType())));
            if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                ShuffleUtils.shuffleToMemory(((MemoryFetchedInput)fetchedInput).getBytes(), input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, fetchedInput.getInputAttemptIdentifier().toString());
            } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                ShuffleUtils.shuffleToDisk(((DiskFetchedInput)fetchedInput).getOutputStream(), this.host + ":" + this.port, input, compressedLength, LOG, fetchedInput.getInputAttemptIdentifier().toString());
            } else {
                throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput);
            }
            if (srcAttemptId.isShared() && callback != null) {
                callback.cache(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
            }
            long endTime = System.currentTimeMillis();
            this.retryStartTime = 0L;
            this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength, endTime - startTime);
            this.remaining.remove(srcAttemptId);
            return null;
        }
        catch (IOException ioe) {
            if (this.isShutDown.get()) {
                this.cleanupFetchedInput(fetchedInput);
                LOG.info((Object)("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() + ", Message: " + ioe.getMessage()));
                return null;
            }
            if (this.shouldRetry(srcAttemptId, ioe)) {
                this.cleanupFetchedInput(fetchedInput);
                throw new FetcherReadTimeoutException(ioe);
            }
            if (srcAttemptId == null || fetchedInput == null) {
                LOG.info((Object)("fetcher failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength), (Throwable)ioe);
                this.cleanupFetchedInput(fetchedInput);
                if (srcAttemptId == null) {
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn((Object)("Failed to shuffle output of " + srcAttemptId + " from " + this.host), (Throwable)ioe);
            this.cleanupFetchedInput(fetchedInput);
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private void cleanupFetchedInput(FetchedInput fetchedInput) {
        if (fetchedInput != null) {
            try {
                fetchedInput.abort();
            }
            catch (IOException e) {
                LOG.info((Object)("Failure to cleanup fetchedInput: " + fetchedInput));
            }
        }
    }

    private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, IOException ioe) {
        if (!(ioe instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTime = System.currentTimeMillis();
        if (this.retryStartTime == 0L) {
            this.retryStartTime = currentTime;
        }
        if (currentTime - this.retryStartTime < (long)this.httpConnectionParams.getReadTimeout()) {
            LOG.warn((Object)("Shuffle output from " + srcAttemptId + " failed, retry it."));
            return true;
        }
        LOG.warn((Object)("Timeout for copying MapOutput with retry on host " + this.host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds."));
        return false;
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            LOG.warn((Object)(" invalid lengths in input header -> headerPathComponent: " + pathComponent + ", nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength));
            return false;
        }
        if (fetchPartition != this.partition) {
            LOG.warn((Object)(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + fetchPartition));
            return false;
        }
        if (!this.remaining.contains(srcAttemptId)) {
            LOG.warn((Object)("Invalid input. Received output for headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId));
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return (InputAttemptIdentifier)this.remaining.iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        Fetcher other = (Fetcher)obj;
        return this.fetcherIdentifier == other.fetcherIdentifier;
    }

    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, false);
        }

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled);
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnection.HttpConnectionParams httpParams) {
            this.fetcher.httpConnectionParams = httpParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
            this.fetcher.codec = codec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
            this.fetcher.ifileReadAhead = readAhead;
            this.fetcher.ifileReadAheadLength = readAheadBytes;
            return this;
        }

        public FetcherBuilder assignWork(String host, int port, int partition, List<InputAttemptIdentifier> inputs) {
            this.fetcher.host = host;
            this.fetcher.port = port;
            this.fetcher.partition = partition;
            this.fetcher.srcAttempts = inputs;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState((this.workAssigned ? 1 : 0) != 0, (Object)"Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptIdentifier[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs, boolean connectFailed) {
            this.fetchResult = fetchResult;
            this.failedInputs = failedInputs;
            this.connectFailed = connectFailed;
        }
    }

    private final class CachingCallBack {
        private CachingCallBack() {
        }

        public void cache(String host, InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput, long compressedLength, long decompressedLength) {
            try {
                TezIndexRecord indexRec;
                Preconditions.checkArgument((Fetcher.this.partition == 0 ? 1 : 0) != 0, (Object)"Partition == 0");
                String tmpSuffix = "" + System.currentTimeMillis() + ".tmp";
                String finalOutput = Fetcher.getMapOutputFile(srcAttemptId.getPathComponent());
                Path outputPath = Fetcher.this.localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, Fetcher.this.conf);
                TezSpillRecord spillRec = new TezSpillRecord(1);
                Path tmpIndex = outputPath.suffix(".index" + tmpSuffix);
                if (Fetcher.this.localFs.exists(tmpIndex)) {
                    LOG.warn((Object)("Found duplicate instance of input index file " + tmpIndex));
                    return;
                }
                Path tmpPath = null;
                switch (fetchedInput.getType()) {
                    case DISK: {
                        DiskFetchedInput input = (DiskFetchedInput)fetchedInput;
                        indexRec = new TezIndexRecord(0L, decompressedLength, compressedLength);
                        Fetcher.this.localFs.mkdirs(outputPath.getParent());
                        tmpPath = outputPath.suffix(tmpSuffix);
                        Fetcher.this.localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
                        boolean renamed = Fetcher.this.localFs.rename(tmpPath, outputPath);
                        if (renamed) break;
                        LOG.warn((Object)("Could not rename to cached file name " + outputPath));
                        Fetcher.this.localFs.delete(tmpPath, false);
                        return;
                    }
                    default: {
                        LOG.warn((Object)("Incorrect use of CachingCallback for " + srcAttemptId));
                        return;
                    }
                }
                spillRec.putIndex(indexRec, 0);
                spillRec.writeToFile(tmpIndex, Fetcher.this.conf);
                boolean renamed = Fetcher.this.localFs.rename(tmpIndex, outputPath.suffix(".index"));
                if (!renamed) {
                    Fetcher.this.localFs.delete(tmpIndex, false);
                    if (outputPath != null) {
                        Fetcher.this.localFs.delete(outputPath, false);
                    }
                    LOG.warn((Object)("Could not rename the index file to " + outputPath.suffix(".index")));
                    return;
                }
            }
            catch (IOException ioe) {
                LOG.warn((Object)("Cache threw an error " + ioe));
            }
        }
    }
}

