package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.class */
public class ShuffleScheduler {
    static ThreadLocal<Long> shuffleStart;
    private static final Log LOG;
    private static final long INITIAL_PENALTY = 2000;
    private static final float PENALTY_GROWTH_RATE = 1.3f;
    private boolean[] finishedMaps;
    private final int numInputs;
    private int remainingMaps;
    private final InputContext inputContext;
    private final Shuffle shuffle;
    private final TezCounter shuffledInputsCounter;
    private final TezCounter skippedInputCounter;
    private final TezCounter reduceShuffleBytes;
    private final TezCounter reduceBytesDecompressed;
    private final TezCounter failedShuffleCounter;
    private final TezCounter bytesShuffledToDisk;
    private final TezCounter bytesShuffledToDiskDirect;
    private final TezCounter bytesShuffledToMem;
    private int maxTaskOutputAtOnce;
    private int maxFetchFailuresBeforeReporting;
    private boolean reportReadErrorImmediately;
    private int maxFailedUniqueFetches;
    private final int abortFailureLimit;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Map<String, MapHost> mapLocations = new HashMap();
    private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap();
    private Set<MapHost> pendingHosts = new HashSet();
    private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet();
    private final Random random = new Random(System.currentTimeMillis());
    private final DelayQueue<Penalty> penalties = new DelayQueue<>();
    private final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap();
    private final Map<String, IntWritable> hostFailures = new HashMap();
    private int maxMapRuntime = 0;
    private long totalBytesShuffledTillNow = 0;
    private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
    private final Referee referee = new Referee();
    private final long startTime = System.currentTimeMillis();
    private long lastProgressTime = this.startTime;

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler$Penalty.class */
    private static class Penalty implements Delayed {
        MapHost host;
        private long endTime;

        Penalty(MapHost mapHost, long j) {
            this.host = mapHost;
            this.endTime = System.currentTimeMillis() + j;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            long j = ((Penalty) delayed).endTime;
            if (this.endTime == j) {
                return 0;
            }
            return this.endTime < j ? -1 : 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler$Referee.class */
    public class Referee extends Thread {
        public Referee() {
            setName("ShufflePenaltyReferee [" + TezUtilsInternal.cleanVertexName(ShuffleScheduler.this.inputContext.getSourceVertexName()) + "]");
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    MapHost mapHost = ((Penalty) ShuffleScheduler.this.penalties.take()).host;
                    synchronized (ShuffleScheduler.this) {
                        if (mapHost.markAvailable() == MapHost.State.PENDING) {
                            ShuffleScheduler.this.pendingHosts.add(mapHost);
                            ShuffleScheduler.this.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    ShuffleScheduler.this.shuffle.reportException(th);
                    return;
                }
            }
        }
    }

    public ShuffleScheduler(InputContext inputContext, Configuration configuration, int i, Shuffle shuffle, TezCounter tezCounter, TezCounter tezCounter2, TezCounter tezCounter3, TezCounter tezCounter4, TezCounter tezCounter5, TezCounter tezCounter6, TezCounter tezCounter7) {
        this.reportReadErrorImmediately = true;
        this.maxFailedUniqueFetches = 5;
        this.inputContext = inputContext;
        this.numInputs = i;
        this.abortFailureLimit = Math.max(30, i / 10);
        this.remainingMaps = i;
        this.finishedMaps = new boolean[this.remainingMaps];
        this.shuffle = shuffle;
        this.shuffledInputsCounter = tezCounter;
        this.reduceShuffleBytes = tezCounter2;
        this.reduceBytesDecompressed = tezCounter3;
        this.failedShuffleCounter = tezCounter4;
        this.bytesShuffledToDisk = tezCounter5;
        this.bytesShuffledToDiskDirect = tezCounter6;
        this.bytesShuffledToMem = tezCounter7;
        this.maxFailedUniqueFetches = Math.min(i, this.maxFailedUniqueFetches);
        this.referee.start();
        this.maxFetchFailuresBeforeReporting = configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 5);
        this.reportReadErrorImmediately = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, true);
        this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE, 20)));
        this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
        LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: maxFetchFailuresBeforeReporting=" + this.maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + this.reportReadErrorImmediately + ", maxFailedUniqueFetches=" + this.maxFailedUniqueFetches + ", abortFailureLimit=" + this.abortFailureLimit + ", maxMapRuntime=" + this.maxMapRuntime + ", maxTaskOutputAtOnce=" + this.maxTaskOutputAtOnce);
    }

    public synchronized void copySucceeded(InputAttemptIdentifier inputAttemptIdentifier, MapHost mapHost, long j, long j2, long j3, MapOutput mapOutput) throws IOException {
        if (isInputFinished(inputAttemptIdentifier.getInputIdentifier().getInputIndex())) {
            LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + inputAttemptIdentifier);
            if (mapOutput != null) {
                mapOutput.abort();
                return;
            }
            return;
        }
        if (mapOutput != null) {
            this.failureCounts.remove(inputAttemptIdentifier);
            if (mapHost != null) {
                this.hostFailures.remove(mapHost.getHostIdentifier());
            }
            mapOutput.commit();
            logIndividualFetchComplete(j3, j, j2, mapOutput, inputAttemptIdentifier);
            if (mapOutput.getType() == MapOutput.Type.DISK) {
                this.bytesShuffledToDisk.increment(j);
            } else if (mapOutput.getType() == MapOutput.Type.DISK_DIRECT) {
                this.bytesShuffledToDiskDirect.increment(j);
            } else {
                this.bytesShuffledToMem.increment(j);
            }
            this.shuffledInputsCounter.increment(1L);
        } else {
            this.skippedInputCounter.increment(1L);
        }
        setInputFinished(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
        int i = this.remainingMaps - 1;
        this.remainingMaps = i;
        if (i == 0) {
            LOG.info("All inputs fetched for input vertex : " + this.inputContext.getSourceVertexName());
            notifyAll();
        }
        this.lastProgressTime = System.currentTimeMillis();
        this.totalBytesShuffledTillNow += j;
        logProgress();
        this.reduceShuffleBytes.increment(j);
        this.reduceBytesDecompressed.increment(j2);
        if (LOG.isDebugEnabled()) {
            LOG.debug("src task: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), inputAttemptIdentifier.getInputIdentifier().getInputIndex(), inputAttemptIdentifier.getAttemptNumber()) + " done");
        }
    }

    private void logIndividualFetchComplete(long j, long j2, long j3, MapOutput mapOutput, InputAttemptIdentifier inputAttemptIdentifier) {
        double d = 0.0d;
        if (j != 0) {
            d = (j2 / (j / 1000.0d)) / 1048576.0d;
        }
        LOG.info("Completed fetch for attempt: " + inputAttemptIdentifier + " to " + mapOutput.getType() + ", CompressedSize=" + j2 + ", DecompressedSize=" + j3 + ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + j + ", Rate=" + this.mbpsFormat.format(d) + " MB/s");
    }

    private void logProgress() {
        LOG.info("copy(" + (this.numInputs - this.remainingMaps) + " of " + this.numInputs + ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) " + this.mbpsFormat.format((this.totalBytesShuffledTillNow / 1048576.0d) / (((System.currentTimeMillis() - this.startTime) / 1000) + 1)) + " MB/s)");
    }

    public synchronized void copyFailed(InputAttemptIdentifier inputAttemptIdentifier, MapHost mapHost, boolean z, boolean z2) {
        mapHost.penalize();
        int i = 1;
        if (this.failureCounts.containsKey(inputAttemptIdentifier)) {
            IntWritable intWritable = this.failureCounts.get(inputAttemptIdentifier);
            intWritable.set(intWritable.get() + 1);
            i = intWritable.get();
        } else {
            this.failureCounts.put(inputAttemptIdentifier, new IntWritable(1));
        }
        String hostIdentifier = mapHost.getHostIdentifier();
        if (this.hostFailures.containsKey(hostIdentifier)) {
            IntWritable intWritable2 = this.hostFailures.get(hostIdentifier);
            intWritable2.set(intWritable2.get() + 1);
        } else {
            this.hostFailures.put(hostIdentifier, new IntWritable(1));
        }
        if (i >= this.abortFailureLimit) {
            IOException iOException = new IOException(i + " failures downloading " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), inputAttemptIdentifier.getInputIdentifier().getInputIndex(), inputAttemptIdentifier.getAttemptNumber()));
            iOException.fillInStackTrace();
            this.shuffle.reportException(iOException);
        }
        this.failedShuffleCounter.increment(1L);
        checkAndInformAM(i, inputAttemptIdentifier, z, z2);
        checkReducerHealth();
        this.penalties.add((DelayQueue<Penalty>) new Penalty(mapHost, (long) (2000.0d * Math.pow(1.2999999523162842d, i))));
    }

    public void reportLocalError(IOException iOException) {
        LOG.error("Shuffle failed : caused by local error", iOException);
        this.shuffle.reportException(iOException);
    }

    private void checkAndInformAM(int i, InputAttemptIdentifier inputAttemptIdentifier, boolean z, boolean z2) {
        if ((this.reportReadErrorImmediately && (z || z2)) || i % this.maxFetchFailuresBeforeReporting == 0) {
            LOG.info("Reporting fetch failure for InputIdentifier: " + inputAttemptIdentifier + " taskAttemptIdentifier: " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), inputAttemptIdentifier.getInputIdentifier().getInputIndex(), inputAttemptIdentifier.getAttemptNumber()) + " to AM.");
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1);
            newArrayListWithCapacity.add(InputReadErrorEvent.create("Fetch failure for " + TezRuntimeUtils.getTaskAttemptIdentifier(this.inputContext.getSourceVertexName(), inputAttemptIdentifier.getInputIdentifier().getInputIndex(), inputAttemptIdentifier.getAttemptNumber()) + " to jobtracker.", inputAttemptIdentifier.getInputIdentifier().getInputIndex(), inputAttemptIdentifier.getAttemptNumber()));
            this.inputContext.sendEvents(newArrayListWithCapacity);
        }
    }

    private void checkReducerHealth() {
        long value = this.failedShuffleCounter.getValue();
        int i = this.numInputs - this.remainingMaps;
        boolean z = ((float) value) / ((float) (value + ((long) i))) < 0.5f;
        boolean z2 = ((float) i) / ((float) this.numInputs) >= 0.5f;
        int currentTimeMillis = (int) (System.currentTimeMillis() - this.lastProgressTime);
        int i2 = (int) (this.lastProgressTime - this.startTime);
        boolean z3 = ((float) currentTimeMillis) / ((float) (i2 > this.maxMapRuntime ? i2 : this.maxMapRuntime)) >= 0.5f;
        if ((this.failureCounts.size() >= this.maxFailedUniqueFetches || this.failureCounts.size() == this.numInputs - i) && !z) {
            if (!z2 || z3) {
                LOG.fatal("Shuffle failed with too many fetch failures and insufficient progress!failureCounts=" + this.failureCounts.size() + ", pendingInputs=" + (this.numInputs - i) + ", reducerHealthy=" + z + ", reducerProgressedEnough=" + z2 + ", reducerStalled=" + z3);
                this.shuffle.reportException(new IOException("Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out."));
            }
        }
    }

    public synchronized void addKnownMapOutput(String str, int i, int i2, String str2, InputAttemptIdentifier inputAttemptIdentifier) {
        String str3 = str + ":" + String.valueOf(i);
        String createIdentifier = MapHost.createIdentifier(str3, i2);
        MapHost mapHost = this.mapLocations.get(createIdentifier);
        if (mapHost == null) {
            mapHost = new MapHost(i2, str3, str2);
            if (!$assertionsDisabled && !createIdentifier.equals(mapHost.getIdentifier())) {
                throw new AssertionError();
            }
            this.mapLocations.put(createIdentifier, mapHost);
        }
        mapHost.addKnownMap(inputAttemptIdentifier);
        this.pathToIdentifierMap.put(getIdentifierFromPathAndReduceId(inputAttemptIdentifier.getPathComponent(), i2), inputAttemptIdentifier);
        if (mapHost.getState() == MapHost.State.PENDING) {
            this.pendingHosts.add(mapHost);
            notifyAll();
        }
    }

    public synchronized void obsoleteInput(InputAttemptIdentifier inputAttemptIdentifier) {
        LOG.info("Adding obsolete input: " + inputAttemptIdentifier);
        this.obsoleteInputs.add(inputAttemptIdentifier);
    }

    public synchronized void putBackKnownMapOutput(MapHost mapHost, InputAttemptIdentifier inputAttemptIdentifier) {
        mapHost.addKnownMap(inputAttemptIdentifier);
    }

    public synchronized MapHost getHost() throws InterruptedException {
        while (this.pendingHosts.isEmpty()) {
            wait();
        }
        MapHost mapHost = null;
        Iterator<MapHost> it = this.pendingHosts.iterator();
        int nextInt = this.random.nextInt(this.pendingHosts.size());
        for (int i = 0; i <= nextInt; i++) {
            mapHost = it.next();
        }
        this.pendingHosts.remove(mapHost);
        mapHost.markBusy();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Assigning " + mapHost + " with " + mapHost.getNumKnownMapOutputs() + " to " + Thread.currentThread().getName());
        }
        shuffleStart.set(Long.valueOf(System.currentTimeMillis()));
        return mapHost;
    }

    public InputAttemptIdentifier getIdentifierForFetchedOutput(String str, int i) {
        return this.pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(str, i));
    }

    private boolean inputShouldBeConsumed(InputAttemptIdentifier inputAttemptIdentifier) {
        return (this.obsoleteInputs.contains(inputAttemptIdentifier) || isInputFinished(inputAttemptIdentifier.getInputIdentifier().getInputIndex())) ? false : true;
    }

    public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost mapHost) {
        List<InputAttemptIdentifier> andClearKnownMaps = mapHost.getAndClearKnownMaps();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (InputAttemptIdentifier inputAttemptIdentifier : andClearKnownMaps) {
            if (inputShouldBeConsumed(inputAttemptIdentifier)) {
                Integer num = new Integer(inputAttemptIdentifier.getInputIdentifier().getInputIndex());
                InputAttemptIdentifier inputAttemptIdentifier2 = (InputAttemptIdentifier) linkedHashMap.get(num);
                if (inputAttemptIdentifier2 == null || inputAttemptIdentifier2.getAttemptNumber() < inputAttemptIdentifier.getAttemptNumber()) {
                    linkedHashMap.put(num, inputAttemptIdentifier);
                    if (inputAttemptIdentifier2 != null) {
                        LOG.warn("Old Src for InputIndex: " + num + " with attemptNumber: " + inputAttemptIdentifier2.getAttemptNumber() + " was not determined to be invalid. Ignoring it for now in favour of " + inputAttemptIdentifier.getAttemptNumber());
                    }
                }
            } else {
                LOG.info("Ignoring finished or obsolete source: " + inputAttemptIdentifier);
            }
        }
        ArrayList arrayList = new ArrayList();
        int i = 0;
        int size = linkedHashMap.size();
        Iterator it = linkedHashMap.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add((InputAttemptIdentifier) ((Map.Entry) it.next()).getValue());
            i++;
            if (i >= this.maxTaskOutputAtOnce) {
                break;
            }
        }
        while (it.hasNext()) {
            mapHost.addKnownMap((InputAttemptIdentifier) ((Map.Entry) it.next()).getValue());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("assigned " + i + " of " + size + " to " + mapHost + " to " + Thread.currentThread().getName());
        }
        return arrayList;
    }

    public synchronized void freeHost(MapHost mapHost) {
        if (mapHost.getState() != MapHost.State.PENALIZED && mapHost.markAvailable() == MapHost.State.PENDING) {
            this.pendingHosts.add(mapHost);
            notifyAll();
        }
        LOG.info(mapHost + " freed by " + Thread.currentThread().getName() + " in " + (System.currentTimeMillis() - shuffleStart.get().longValue()) + "ms");
    }

    public synchronized void resetKnownMaps() {
        this.mapLocations.clear();
        this.obsoleteInputs.clear();
        this.pendingHosts.clear();
        this.pathToIdentifierMap.clear();
    }

    public synchronized boolean isDone() {
        return this.remainingMaps == 0;
    }

    public synchronized boolean waitUntilDone(int i) throws InterruptedException {
        if (this.remainingMaps <= 0) {
            return true;
        }
        wait(i);
        return this.remainingMaps == 0;
    }

    private String getIdentifierFromPathAndReduceId(String str, int i) {
        return str + "_" + i;
    }

    public void close() throws InterruptedException {
        this.referee.interrupt();
        this.referee.join();
    }

    public synchronized void informMaxMapRunTime(int i) {
        if (i > this.maxMapRuntime) {
            this.maxMapRuntime = i;
        }
    }

    void setInputFinished(int i) {
        synchronized (this.finishedMaps) {
            this.finishedMaps[i] = true;
        }
    }

    boolean isInputFinished(int i) {
        boolean z;
        synchronized (this.finishedMaps) {
            z = this.finishedMaps[i];
        }
        return z;
    }

    static {
        $assertionsDisabled = !ShuffleScheduler.class.desiredAssertionStatus();
        shuffleStart = new ThreadLocal<Long>() { // from class: org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.lang.ThreadLocal
            public Long initialValue() {
                return 0L;
            }
        };
        LOG = LogFactory.getLog(ShuffleScheduler.class);
    }
}
