package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.class */
public class ShuffleManager implements FetcherCallback {
    private static final Logger LOG;
    private final InputContext inputContext;
    private final int numInputs;
    private final FetchedInputAllocator inputManager;
    private final ListeningExecutorService fetcherExecutor;
    private final ListeningExecutorService schedulerExecutor;
    private final RunShuffleCallable schedulerCallable;
    private final Set<Integer> completedInputSet;
    private long totalBytesShuffledTillNow;
    private final int numFetchers;
    private final boolean asyncHttp;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final CompressionCodec codec;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final boolean verifyDiskChecksum;
    private final int ifileBufferSize;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final String srcNameTrimmed;
    private final int maxTaskOutputAtOnce;
    private final TezCounter shuffledInputsCounter;
    private final TezCounter failedShufflesCounter;
    private final TezCounter bytesShuffledCounter;
    private final TezCounter decompressedDataSizeCounter;
    private final TezCounter bytesShuffledToDiskCounter;
    private final TezCounter bytesShuffledToMemCounter;
    private final TezCounter bytesShuffledDirectDiskCounter;
    private volatile Throwable shuffleError;
    private final HttpConnectionParams httpConnectionParams;
    private final RawLocalFileSystem localFs;
    private final Path[] localDisks;
    private final String localhostName;
    private final int shufflePort;
    private final TezCounter shufflePhaseTime;
    private final TezCounter firstEventReceived;
    private final TezCounter lastEventReceived;

    @VisibleForTesting
    final Map<Integer, ShuffleEventInfo> shuffleInfoEventsMap;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
    private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
    private final AtomicInteger numFetchedSpills = new AtomicInteger(0);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition wakeLoop = this.lock.newCondition();
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);
    private final BlockingQueue<FetchedInput> completedInputs = new LinkedBlockingDeque();
    private final ConcurrentMap<String, InputHost> knownSrcHosts = new ConcurrentHashMap();
    private final BlockingQueue<InputHost> pendingHosts = new LinkedBlockingQueue();
    private final Set<InputAttemptIdentifier> obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap());
    private Set<Fetcher> runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final long startTime = System.currentTimeMillis();
    private long lastProgressTime = this.startTime;
    private final LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager$FetchFutureCallback.class */
    public class FetchFutureCallback implements FutureCallback<FetchResult> {
        private final Fetcher fetcher;
        static final /* synthetic */ boolean $assertionsDisabled;

        public FetchFutureCallback(Fetcher fetcher) {
            this.fetcher = fetcher;
        }

        private void doBookKeepingForFetcherComplete() {
            ShuffleManager.this.lock.lock();
            try {
                ShuffleManager.this.runningFetchers.remove(this.fetcher);
                ShuffleManager.this.wakeLoop.signal();
                ShuffleManager.this.lock.unlock();
            } catch (Throwable th) {
                ShuffleManager.this.lock.unlock();
                throw th;
            }
        }

        public void onSuccess(FetchResult fetchResult) {
            this.fetcher.shutdown();
            if (ShuffleManager.this.isShutdown.get()) {
                if (ShuffleManager.LOG.isDebugEnabled()) {
                    ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": Already shutdown. Ignoring event from fetcher");
                    return;
                }
                return;
            }
            Iterable<InputAttemptIdentifier> pendingInputs = fetchResult.getPendingInputs();
            if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
                InputHost inputHost = (InputHost) ShuffleManager.this.knownSrcHosts.get(InputHost.createIdentifier(fetchResult.getHost(), fetchResult.getPort()));
                if (!$assertionsDisabled && inputHost == null) {
                    throw new AssertionError();
                }
                Iterator<InputAttemptIdentifier> it = pendingInputs.iterator();
                while (it.hasNext()) {
                    inputHost.addKnownInput(it.next());
                }
                inputHost.setAdditionalInfo(fetchResult.getAdditionalInfo());
                ShuffleManager.this.pendingHosts.add(inputHost);
            }
            doBookKeepingForFetcherComplete();
        }

        public void onFailure(Throwable th) {
            this.fetcher.shutdown();
            if (ShuffleManager.this.isShutdown.get()) {
                if (ShuffleManager.LOG.isDebugEnabled()) {
                    ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": Already shutdown. Ignoring error from fetcher: " + th);
                }
            } else {
                ShuffleManager.LOG.error(ShuffleManager.this.srcNameTrimmed + ": Fetcher failed with error: ", th);
                ShuffleManager.this.shuffleError = th;
                ShuffleManager.this.inputContext.reportFailure(TaskFailureType.NON_FATAL, th, "Fetch failed");
                doBookKeepingForFetcherComplete();
            }
        }

        static {
            $assertionsDisabled = !ShuffleManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager$NullFetchedInput.class */
    public static class NullFetchedInput extends FetchedInput {
        public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
            super(FetchedInput.Type.MEMORY, -1L, -1L, inputAttemptIdentifier, null);
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInput
        public OutputStream getOutputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInput
        public InputStream getInputStream() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInput
        public void commit() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInput
        public void abort() throws IOException {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInput
        public void free() {
            throw new UnsupportedOperationException("Not supported for NullFetchedInput");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager$RunShuffleCallable.class */
    public class RunShuffleCallable extends CallableWithNdc<Void> {
        private final Configuration conf;

        public RunShuffleCallable(Configuration configuration) {
            this.conf = configuration;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Type inference failed for: r0v105, types: [org.apache.tez.runtime.library.common.shuffle.Fetcher, java.lang.Object, java.util.concurrent.Callable] */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public Void m24callInternal() throws Exception {
            while (!ShuffleManager.this.isShutdown.get() && ShuffleManager.this.numCompletedInputs.get() < ShuffleManager.this.numInputs) {
                ShuffleManager.this.lock.lock();
                try {
                    if ((ShuffleManager.this.runningFetchers.size() >= ShuffleManager.this.numFetchers || ShuffleManager.this.pendingHosts.isEmpty()) && ShuffleManager.this.numCompletedInputs.get() < ShuffleManager.this.numInputs) {
                        ShuffleManager.this.wakeLoop.await();
                    }
                    if (ShuffleManager.this.shuffleError != null) {
                        break;
                    }
                    if (ShuffleManager.LOG.isDebugEnabled()) {
                        ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": NumCompletedInputs: " + ShuffleManager.this.numCompletedInputs);
                    }
                    if (ShuffleManager.this.numCompletedInputs.get() < ShuffleManager.this.numInputs && !ShuffleManager.this.isShutdown.get()) {
                        ShuffleManager.this.lock.lock();
                        try {
                            int size = ShuffleManager.this.numFetchers - ShuffleManager.this.runningFetchers.size();
                            int i = 0;
                            while (true) {
                                if (ShuffleManager.this.pendingHosts.peek() == null || ShuffleManager.this.isShutdown.get()) {
                                    break;
                                }
                                try {
                                    InputHost inputHost = (InputHost) ShuffleManager.this.pendingHosts.take();
                                    if (ShuffleManager.LOG.isDebugEnabled()) {
                                        ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": Processing pending host: " + inputHost.toDetailedString());
                                    }
                                    if (inputHost.getNumPendingInputs() > 0 && !ShuffleManager.this.isShutdown.get()) {
                                        ?? constructFetcherForHost = ShuffleManager.this.constructFetcherForHost(inputHost, this.conf);
                                        ShuffleManager.this.runningFetchers.add(constructFetcherForHost);
                                        if (ShuffleManager.this.isShutdown.get()) {
                                            ShuffleManager.LOG.info(ShuffleManager.this.srcNameTrimmed + ": hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                                            break;
                                        }
                                        Futures.addCallback(ShuffleManager.this.fetcherExecutor.submit((Callable) constructFetcherForHost), new FetchFutureCallback(constructFetcherForHost));
                                        i++;
                                        if (i >= size) {
                                            break;
                                        }
                                    } else if (ShuffleManager.LOG.isDebugEnabled()) {
                                        ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": Skipping host: " + inputHost.getIdentifier() + " since it has no inputs to process");
                                    }
                                } catch (InterruptedException e) {
                                    if (!ShuffleManager.this.isShutdown.get()) {
                                        throw e;
                                    }
                                    ShuffleManager.LOG.info(ShuffleManager.this.srcNameTrimmed + ": Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                                    Thread.currentThread().interrupt();
                                }
                            }
                            ShuffleManager.this.lock.unlock();
                        } finally {
                        }
                    }
                } finally {
                    ShuffleManager.this.lock.unlock();
                }
            }
            ShuffleManager.this.shufflePhaseTime.setValue(System.currentTimeMillis() - ShuffleManager.this.startTime);
            ShuffleManager.LOG.info(ShuffleManager.this.srcNameTrimmed + ": Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
            if (ShuffleManager.this.fetcherExecutor.isShutdown()) {
                return null;
            }
            ShuffleManager.this.fetcherExecutor.shutdownNow();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager$SchedulerFutureCallback.class */
    public class SchedulerFutureCallback implements FutureCallback<Void> {
        private SchedulerFutureCallback() {
        }

        public void onSuccess(Void r5) {
            ShuffleManager.LOG.info(ShuffleManager.this.srcNameTrimmed + ": Scheduler thread completed");
        }

        public void onFailure(Throwable th) {
            if (!ShuffleManager.this.isShutdown.get()) {
                ShuffleManager.LOG.error(ShuffleManager.this.srcNameTrimmed + ": Scheduler failed with error: ", th);
                ShuffleManager.this.inputContext.reportFailure(TaskFailureType.NON_FATAL, th, "Shuffle Scheduler Failed");
            } else if (ShuffleManager.LOG.isDebugEnabled()) {
                ShuffleManager.LOG.debug(ShuffleManager.this.srcNameTrimmed + ": Already shutdown. Ignoring error: " + th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager$ShuffleEventInfo.class */
    public static class ShuffleEventInfo {
        int attemptNum;
        String id;
        int finalEventId = -1;
        BitSet eventsProcessed = new BitSet();

        ShuffleEventInfo(InputAttemptIdentifier inputAttemptIdentifier) {
            this.id = inputAttemptIdentifier.getInputIdentifier() + "_" + inputAttemptIdentifier.getAttemptNumber();
            this.attemptNum = inputAttemptIdentifier.getAttemptNumber();
        }

        void spillProcessed(int i) {
            if (this.finalEventId != -1) {
                Preconditions.checkState(this.eventsProcessed.cardinality() <= this.finalEventId + 1, "Wrong state. eventsProcessed cardinality=" + this.eventsProcessed.cardinality() + " finalEventId=" + this.finalEventId + ", spillId=" + i + ", " + toString());
            }
            this.eventsProcessed.set(i);
        }

        void setFinalEventId(int i) {
            this.finalEventId = i;
        }

        boolean isDone() {
            if (ShuffleManager.LOG.isDebugEnabled()) {
                ShuffleManager.LOG.debug("finalEventId=" + this.finalEventId + ", eventsProcessed cardinality=" + this.eventsProcessed.cardinality());
            }
            return this.finalEventId != -1 && this.finalEventId + 1 == this.eventsProcessed.cardinality();
        }

        public String toString() {
            return "[eventsProcessed=" + this.eventsProcessed + ", finalEventId=" + this.finalEventId + ", id=" + this.id + ", attemptNum=" + this.attemptNum + "]";
        }
    }

    public ShuffleManager(InputContext inputContext, Configuration configuration, int i, int i2, boolean z, int i3, CompressionCodec compressionCodec, FetchedInputAllocator fetchedInputAllocator) throws IOException {
        this.inputContext = inputContext;
        this.numInputs = i;
        this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
        this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
        this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
        this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
        this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
        this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
        this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
        this.ifileBufferSize = i2;
        this.ifileReadAhead = z;
        this.ifileReadAheadLength = i3;
        this.codec = compressionCodec;
        this.inputManager = fetchedInputAllocator;
        this.localDiskFetchEnabled = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
        this.sharedFetchEnabled = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH, false);
        this.verifyDiskChecksum = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, true);
        this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
        this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
        this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
        this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
        this.completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap(i));
        this.numFetchers = Math.min(configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 20), i);
        this.fetcherExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(this.numFetchers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher {" + this.srcNameTrimmed + "} #%d").build()));
        this.schedulerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleRunner {" + this.srcNameTrimmed + "}").build()));
        this.schedulerCallable = new RunShuffleCallable(configuration);
        this.jobTokenSecretMgr = new JobTokenSecretManager(ShuffleUtils.getJobTokenSecretFromTokenBytes(inputContext.getServiceConsumerMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID)));
        this.asyncHttp = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
        this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(configuration);
        this.localFs = FileSystem.getLocal(configuration).getRaw();
        this.localDisks = (Path[]) Iterables.toArray(this.localDirAllocator.getAllLocalPathsToRead(".", configuration), Path.class);
        this.localhostName = inputContext.getExecutionContext().getHostName();
        this.shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
        this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, 20)));
        Arrays.sort(this.localDisks);
        this.shuffleInfoEventsMap = new ConcurrentHashMap();
        LOG.info(this.srcNameTrimmed + ": numInputs=" + i + ", compressionCodec=" + (compressionCodec == null ? "NoCompressionCodec" : compressionCodec.getClass().getName()) + ", numFetchers=" + this.numFetchers + ", ifileBufferSize=" + this.ifileBufferSize + ", ifileReadAheadEnabled=" + this.ifileReadAhead + ", ifileReadAheadLength=" + i3 + ", localDiskFetchEnabled=" + this.localDiskFetchEnabled + ", sharedFetchEnabled=" + this.sharedFetchEnabled + ", " + this.httpConnectionParams.toString() + ", maxTaskOutputAtOnce=" + this.maxTaskOutputAtOnce);
    }

    public void run() throws IOException {
        Preconditions.checkState(this.inputManager != null, "InputManager must be configured");
        Futures.addCallback(this.schedulerExecutor.submit(this.schedulerCallable), new SchedulerFutureCallback());
        this.schedulerExecutor.shutdown();
    }

    private boolean validateInputAttemptForPipelinedShuffle(InputAttemptIdentifier inputAttemptIdentifier) {
        ShuffleEventInfo shuffleEventInfo;
        if (!inputAttemptIdentifier.canRetrieveInputInChunks() || (shuffleEventInfo = this.shuffleInfoEventsMap.get(Integer.valueOf(inputAttemptIdentifier.getInputIdentifier()))) == null || inputAttemptIdentifier.getAttemptNumber() == shuffleEventInfo.attemptNum) {
            return true;
        }
        reportFatalError(new IOException(), inputAttemptIdentifier + " already exists. Previous attempt's data could have been already merged to memory/disk outputs.  Failing the fetch early. currentAttemptNum=" + shuffleEventInfo.attemptNum + ", eventsProcessed=" + shuffleEventInfo.eventsProcessed + ", newAttemptNum=" + inputAttemptIdentifier.getAttemptNumber());
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Fetcher constructFetcherForHost(InputHost inputHost, Configuration configuration) {
        Path path = null;
        if (this.sharedFetchEnabled) {
            path = new Path(this.localDisks[Math.abs(Objects.hashCode(new Object[]{this.srcNameTrimmed, inputHost.getHost()})) % this.localDisks.length], "locks");
        }
        Fetcher.FetcherBuilder fetcherBuilder = new Fetcher.FetcherBuilder(this, this.httpConnectionParams, this.inputManager, this.inputContext.getApplicationId(), this.inputContext.getDagIdentifier(), this.jobTokenSecretMgr, this.srcNameTrimmed, configuration, this.localFs, this.localDirAllocator, path, this.localDiskFetchEnabled, this.sharedFetchEnabled, this.localhostName, this.shufflePort, this.asyncHttp, this.verifyDiskChecksum);
        if (this.codec != null) {
            fetcherBuilder.setCompressionParameters(this.codec);
        }
        fetcherBuilder.setIFileParams(this.ifileReadAhead, this.ifileReadAheadLength);
        List<InputAttemptIdentifier> clearAndGetPendingInputs = inputHost.clearAndGetPendingInputs();
        int i = 0;
        Iterator<InputAttemptIdentifier> it = clearAndGetPendingInputs.iterator();
        while (it.hasNext()) {
            InputAttemptIdentifier next = it.next();
            if (validateInputAttemptForPipelinedShuffle(next)) {
                if (this.completedInputSet.contains(Integer.valueOf(next.getInputIdentifier()))) {
                    it.remove();
                } else if (this.obsoletedInputs.contains(next)) {
                    it.remove();
                } else if (i >= this.maxTaskOutputAtOnce) {
                    it.remove();
                    inputHost.addKnownInput(next);
                } else {
                    i++;
                }
            }
        }
        if (inputHost.getNumPendingInputs() > 0) {
            this.pendingHosts.add(inputHost);
        }
        fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), inputHost.getSrcPhysicalIndex(), clearAndGetPendingInputs);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created Fetcher for host: " + inputHost.getHost() + ", info: " + inputHost.getAdditionalInfo() + ", with inputs: " + clearAndGetPendingInputs);
        }
        return fetcherBuilder.build();
    }

    public void addKnownInput(String str, int i, InputAttemptIdentifier inputAttemptIdentifier, int i2) {
        String createIdentifier = InputHost.createIdentifier(str, i);
        InputHost inputHost = this.knownSrcHosts.get(createIdentifier);
        if (inputHost == null) {
            inputHost = new InputHost(str, i, this.inputContext.getApplicationId(), i2);
            if (!$assertionsDisabled && !createIdentifier.equals(inputHost.getIdentifier())) {
                throw new AssertionError();
            }
            InputHost putIfAbsent = this.knownSrcHosts.putIfAbsent(createIdentifier, inputHost);
            if (putIfAbsent != null) {
                inputHost = putIfAbsent;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.srcNameTrimmed + ": Adding input: " + inputAttemptIdentifier + ", to host: " + inputHost);
        }
        if (validateInputAttemptForPipelinedShuffle(inputAttemptIdentifier)) {
            int inputIdentifier = inputAttemptIdentifier.getInputIdentifier();
            if (this.shuffleInfoEventsMap.get(Integer.valueOf(inputIdentifier)) == null) {
                this.shuffleInfoEventsMap.put(Integer.valueOf(inputIdentifier), new ShuffleEventInfo(inputAttemptIdentifier));
            }
            inputHost.addKnownInput(inputAttemptIdentifier);
            this.lock.lock();
            try {
                if (this.pendingHosts.offer(inputHost)) {
                    this.wakeLoop.signal();
                    this.lock.unlock();
                } else {
                    String str2 = "Unable to add host: " + inputHost.getIdentifier() + " to pending queue";
                    LOG.error(str2);
                    throw new TezUncheckedException(str2);
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    public void addCompletedInputWithNoData(InputAttemptIdentifier inputAttemptIdentifier) {
        int inputIdentifier = inputAttemptIdentifier.getInputIdentifier();
        if (LOG.isDebugEnabled()) {
            LOG.debug("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
        }
        if (!this.completedInputSet.contains(Integer.valueOf(inputIdentifier))) {
            synchronized (this.completedInputSet) {
                if (!this.completedInputSet.contains(Integer.valueOf(inputIdentifier))) {
                    NullFetchedInput nullFetchedInput = new NullFetchedInput(inputAttemptIdentifier);
                    if (inputAttemptIdentifier.canRetrieveInputInChunks()) {
                        registerCompletedInputForPipelinedShuffle(inputAttemptIdentifier, nullFetchedInput);
                    } else {
                        registerCompletedInput(nullFetchedInput);
                    }
                }
            }
        }
        this.lock.lock();
        try {
            this.wakeLoop.signal();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void updateEventReceivedTime() {
        long currentTimeMillis = System.currentTimeMillis() - this.startTime;
        if (this.firstEventReceived.getValue() != 0) {
            this.lastEventReceived.setValue(currentTimeMillis);
        } else {
            this.firstEventReceived.setValue(currentTimeMillis);
            this.lastEventReceived.setValue(currentTimeMillis);
        }
    }

    public synchronized void obsoleteKnownInput(InputAttemptIdentifier inputAttemptIdentifier) {
        this.obsoletedInputs.add(inputAttemptIdentifier);
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.FetcherCallback
    public void fetchSucceeded(String str, InputAttemptIdentifier inputAttemptIdentifier, FetchedInput fetchedInput, long j, long j2, long j3) throws IOException {
        int inputIdentifier = inputAttemptIdentifier.getInputIdentifier();
        this.lock.lock();
        try {
            this.lastProgressTime = System.currentTimeMillis();
            this.lock.unlock();
            this.inputContext.notifyProgress();
            boolean z = false;
            if (!this.completedInputSet.contains(Integer.valueOf(inputIdentifier))) {
                synchronized (this.completedInputSet) {
                    if (!this.completedInputSet.contains(Integer.valueOf(inputIdentifier))) {
                        fetchedInput.commit();
                        z = true;
                        ShuffleUtils.logIndividualFetchComplete(LOG, j3, j, j2, fetchedInput.getType().toString(), inputAttemptIdentifier);
                        this.shuffledInputsCounter.increment(1L);
                        this.bytesShuffledCounter.increment(j);
                        if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                            this.bytesShuffledToMemCounter.increment(j);
                        } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                            this.bytesShuffledToDiskCounter.increment(j);
                        } else if (fetchedInput.getType() == FetchedInput.Type.DISK_DIRECT) {
                            this.bytesShuffledDirectDiskCounter.increment(j);
                        }
                        this.decompressedDataSizeCounter.increment(j2);
                        if (inputAttemptIdentifier.canRetrieveInputInChunks()) {
                            registerCompletedInputForPipelinedShuffle(inputAttemptIdentifier, fetchedInput);
                        } else {
                            registerCompletedInput(fetchedInput);
                        }
                        this.lock.lock();
                        try {
                            this.totalBytesShuffledTillNow += j;
                            logProgress();
                            this.lock.unlock();
                        } finally {
                        }
                    }
                }
            }
            if (!z) {
                fetchedInput.abort();
                return;
            }
            this.lock.lock();
            try {
                this.wakeLoop.signal();
                this.lock.unlock();
            } finally {
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void registerCompletedInput(FetchedInput fetchedInput) {
        this.lock.lock();
        try {
            maybeInformInputReady(fetchedInput);
            adjustCompletedInputs(fetchedInput);
            this.numFetchedSpills.getAndIncrement();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void maybeInformInputReady(FetchedInput fetchedInput) {
        this.lock.lock();
        try {
            this.completedInputs.add(fetchedInput);
            if (!this.inputReadyNotificationSent.getAndSet(true)) {
                this.inputContext.inputIsReady();
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void adjustCompletedInputs(FetchedInput fetchedInput) {
        this.lock.lock();
        try {
            this.completedInputSet.add(Integer.valueOf(fetchedInput.getInputAttemptIdentifier().getInputIdentifier()));
            if (this.numCompletedInputs.incrementAndGet() == this.numInputs) {
                LOG.info("All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName());
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void registerCompletedInputForPipelinedShuffle(InputAttemptIdentifier inputAttemptIdentifier, FetchedInput fetchedInput) {
        if (validateInputAttemptForPipelinedShuffle(inputAttemptIdentifier)) {
            int inputIdentifier = inputAttemptIdentifier.getInputIdentifier();
            ShuffleEventInfo shuffleEventInfo = this.shuffleInfoEventsMap.get(Integer.valueOf(inputIdentifier));
            if (shuffleEventInfo == null && (fetchedInput instanceof NullFetchedInput)) {
                shuffleEventInfo = new ShuffleEventInfo(inputAttemptIdentifier);
                this.shuffleInfoEventsMap.put(Integer.valueOf(inputIdentifier), shuffleEventInfo);
            }
            if (!$assertionsDisabled && shuffleEventInfo == null) {
                throw new AssertionError();
            }
            shuffleEventInfo.spillProcessed(inputAttemptIdentifier.getSpillEventId());
            this.numFetchedSpills.getAndIncrement();
            if (inputAttemptIdentifier.getFetchTypeInfo() == InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE) {
                shuffleEventInfo.setFinalEventId(inputAttemptIdentifier.getSpillEventId());
            }
            this.lock.lock();
            try {
                maybeInformInputReady(fetchedInput);
                if (shuffleEventInfo.isDone()) {
                    adjustCompletedInputs(fetchedInput);
                    this.shuffleInfoEventsMap.remove(Integer.valueOf(inputAttemptIdentifier.getInputIdentifier()));
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("eventInfo " + shuffleEventInfo.toString());
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    private void reportFatalError(Throwable th, String str) {
        LOG.error(str);
        this.inputContext.reportFailure(TaskFailureType.NON_FATAL, th, str);
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.FetcherCallback
    public void fetchFailed(String str, InputAttemptIdentifier inputAttemptIdentifier, boolean z) {
        LOG.info(this.srcNameTrimmed + ": Fetch failed for src: " + inputAttemptIdentifier + "InputIdentifier: " + inputAttemptIdentifier + ", connectFailed: " + z);
        this.failedShufflesCounter.increment(1L);
        this.inputContext.notifyProgress();
        if (inputAttemptIdentifier == null) {
            reportFatalError(null, "Received fetchFailure for an unknown src (null)");
            return;
        }
        InputReadErrorEvent create = InputReadErrorEvent.create("Fetch failure while fetching from " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), inputAttemptIdentifier.getInputIdentifier(), inputAttemptIdentifier.getAttemptNumber()), inputAttemptIdentifier.getInputIdentifier(), inputAttemptIdentifier.getAttemptNumber());
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1);
        newArrayListWithCapacity.add(create);
        this.inputContext.sendEvents(newArrayListWithCapacity);
    }

    public void shutdown() throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            LOG.info(this.srcNameTrimmed + ": Thread interrupted. Need to cleanup the local dirs");
        }
        if (this.isShutdown.getAndSet(true)) {
            return;
        }
        LOG.info("Shutting down pending fetchers on source" + this.srcNameTrimmed + ": " + this.runningFetchers.size());
        this.lock.lock();
        try {
            this.wakeLoop.signal();
            Iterator<Fetcher> it = this.runningFetchers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
                this.schedulerExecutor.shutdownNow();
            }
            if (this.fetcherExecutor == null || this.fetcherExecutor.isShutdown()) {
                return;
            }
            this.fetcherExecutor.shutdownNow();
        } finally {
            this.lock.unlock();
        }
    }

    public boolean newInputAvailable() {
        FetchedInput peek = this.completedInputs.peek();
        return (peek == null || (peek instanceof NullFetchedInput)) ? false : true;
    }

    public boolean allInputsFetched() {
        this.lock.lock();
        try {
            return this.numCompletedInputs.get() == this.numInputs;
        } finally {
            this.lock.unlock();
        }
    }

    public FetchedInput getNextInput() throws InterruptedException {
        FetchedInput peek;
        while (true) {
            this.lock.lock();
            try {
                peek = this.completedInputs.peek();
                if (peek == null && allInputsFetched()) {
                    break;
                }
                this.lock.unlock();
                peek = this.completedInputs.take();
                if (!(peek instanceof NullFetchedInput)) {
                    break;
                }
            } finally {
                this.lock.unlock();
            }
        }
        return peek;
    }

    private void logProgress() {
        int i = this.numCompletedInputs.get();
        if (i > this.nextProgressLineEventCount.get() || i == this.numInputs) {
            this.nextProgressLineEventCount.addAndGet(50);
            LOG.info("copy(" + i + " (spillsFetched=" + this.numFetchedSpills.get() + ") of " + this.numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + this.mbpsFormat.format((this.totalBytesShuffledTillNow / 1048576.0d) / (((System.currentTimeMillis() - this.startTime) / 1000) + 1)) + " MB/s)");
        }
    }

    static {
        $assertionsDisabled = !ShuffleManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ShuffleManager.class);
    }
}
