package org.apache.tez.dag.library.vertexmanager;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager.class */
public class FairShuffleVertexManager extends ShuffleVertexManagerBase {
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = "tez.fair-shuffle-vertex-manager.desired-task-input-size";
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = "tez.fair-shuffle-vertex-manager.enable.auto-parallel";
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.fair-shuffle-vertex-manager.min-src-fraction";
    public static final float TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.fair-shuffle-vertex-manager.max-src-fraction";
    public static final float TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    FairShuffleVertexManagerConfig mgrConfig;
    private static final Logger LOG = LoggerFactory.getLogger(FairShuffleVertexManager.class);
    public static final long TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 100 * MB;
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = FairRoutingType.NONE.getType();

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager$FairRoutingType.class */
    public enum FairRoutingType {
        NONE("none"),
        REDUCE_PARALLELISM("reduce_parallelism"),
        FAIR_PARALLELISM("fair_parallelism");

        private final String type;

        FairRoutingType(String str) {
            this.type = str;
        }

        public final String getType() {
            return this.type;
        }

        public boolean reduceParallelismEnabled() {
            return equals(REDUCE_PARALLELISM);
        }

        public boolean fairParallelismEnabled() {
            return equals(FAIR_PARALLELISM);
        }

        public boolean enabled() {
            return !equals(NONE);
        }

        public static FairRoutingType fromString(String str) {
            if (str != null) {
                for (FairRoutingType fairRoutingType : values()) {
                    if (str.equalsIgnoreCase(fairRoutingType.type)) {
                        return fairRoutingType;
                    }
                }
            }
            throw new IllegalArgumentException("Invalid type " + str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager$FairShuffleVertexManagerConfig.class */
    public static class FairShuffleVertexManagerConfig extends ShuffleVertexManagerBase.ShuffleVertexManagerBaseConfig {
        final FairRoutingType fairRoutingType;

        public FairShuffleVertexManagerConfig(boolean z, long j, float f, float f2, FairRoutingType fairRoutingType) {
            super(z, j, f, f2);
            this.fairRoutingType = fairRoutingType;
            FairShuffleVertexManager.LOG.info("fairRoutingType {}", this.fairRoutingType);
        }

        FairRoutingType getFairRoutingType() {
            return this.fairRoutingType;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager$FairShuffleVertexManagerConfigBuilder.class */
    public static final class FairShuffleVertexManagerConfigBuilder {
        private final Configuration conf;

        private FairShuffleVertexManagerConfigBuilder(@Nullable Configuration configuration) {
            if (configuration == null) {
                this.conf = new Configuration(false);
            } else {
                this.conf = configuration;
            }
        }

        public FairShuffleVertexManagerConfigBuilder setAutoParallelism(FairRoutingType fairRoutingType) {
            this.conf.set(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, fairRoutingType.toString());
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setSlowStartMinSrcCompletionFraction(float f) {
            this.conf.setFloat(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, f);
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setSlowStartMaxSrcCompletionFraction(float f) {
            this.conf.setFloat(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, f);
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setDesiredTaskInputSize(long j) {
            this.conf.setLong(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, j);
            return this;
        }

        public VertexManagerPluginDescriptor build() {
            try {
                return VertexManagerPluginDescriptor.create(FairShuffleVertexManager.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));
            } catch (IOException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager$FairSourceVertexInfo.class */
    public static class FairSourceVertexInfo extends ShuffleVertexManagerBase.SourceVertexInfo {
        private final HashMap<Integer, DestinationTaskInputsProperty> destinationInputsProperties;

        FairSourceVertexInfo(EdgeProperty edgeProperty, int i) {
            super(edgeProperty, i);
            this.destinationInputsProperties = new HashMap<>();
        }

        public HashMap<Integer, DestinationTaskInputsProperty> getDestinationInputsProperties() {
            return this.destinationInputsProperties;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/FairShuffleVertexManager$PartitionsGroupingCalculator.class */
    public class PartitionsGroupingCalculator implements Iterable<DestinationTaskInputsProperty> {
        private final FairSourceVertexInfo sourceVertexInfo;
        private long[] estimatedPartitionOutputSize;
        private long sizeOfPartitions = 0;
        private int numOfPartitions = 0;
        private int firstPartitionId = 0;
        private int numOfBaseSourceTasks = 0;
        private int numOfBaseDestinationTasks = 0;

        public PartitionsGroupingCalculator(long[] jArr, FairSourceVertexInfo fairSourceVertexInfo) {
            this.estimatedPartitionOutputSize = jArr;
            this.sourceVertexInfo = fairSourceVertexInfo;
        }

        private void startNextPartitionsGroup() {
            this.firstPartitionId += this.numOfPartitions;
            this.sizeOfPartitions = 0L;
            this.numOfPartitions = 0;
            this.numOfBaseSourceTasks = 0;
            this.numOfBaseDestinationTasks = 0;
        }

        private int getNextPartitionId() {
            return this.firstPartitionId + this.numOfPartitions;
        }

        private void addNextPartition() {
            if (hasPartitionsLeft()) {
                this.sizeOfPartitions += this.estimatedPartitionOutputSize[getNextPartitionId()];
                this.numOfPartitions++;
            }
        }

        private boolean hasPartitionsLeft() {
            return getNextPartitionId() < this.estimatedPartitionOutputSize.length;
        }

        private long getCurrentAndNextPartitionSize() {
            return hasPartitionsLeft() ? this.sizeOfPartitions + this.estimatedPartitionOutputSize[getNextPartitionId()] : this.sizeOfPartitions;
        }

        private boolean computeSourceTasksGrouping() {
            boolean z = true;
            int checkedCast = Ints.checkedCast(FairShuffleVertexManager.ceil(getCurrentAndNextPartitionSize(), FairShuffleVertexManager.this.config.getDesiredTaskInputDataSize()));
            if (checkedCast <= 1) {
                addNextPartition();
                if (hasPartitionsLeft()) {
                    z = false;
                } else {
                    this.numOfBaseDestinationTasks = 1;
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
                }
            } else if (this.numOfPartitions == 0) {
                addNextPartition();
                if (FairShuffleVertexManager.this.mgrConfig.getFairRoutingType().reduceParallelismEnabled()) {
                    this.numOfBaseDestinationTasks = 1;
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
                } else if (this.sourceVertexInfo.numTasks >= checkedCast) {
                    this.numOfBaseDestinationTasks = checkedCast - (this.sourceVertexInfo.numTasks % checkedCast);
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks / checkedCast;
                } else {
                    this.numOfBaseDestinationTasks = this.sourceVertexInfo.numTasks;
                    this.numOfBaseSourceTasks = 1;
                }
            } else {
                this.numOfBaseDestinationTasks = 1;
                this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
            }
            return z;
        }

        @Override // java.lang.Iterable
        public Iterator<DestinationTaskInputsProperty> iterator() {
            return new UnmodifiableIterator<DestinationTaskInputsProperty>() { // from class: org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.PartitionsGroupingCalculator.1
                private int j = 0;
                private boolean visitedAtLeastOnce = false;
                private int groupIndex = 0;

                private int getNumOfSourceTasks() {
                    int i = this.groupIndex;
                    this.groupIndex = i + 1;
                    return i < PartitionsGroupingCalculator.this.numOfBaseDestinationTasks ? PartitionsGroupingCalculator.this.numOfBaseSourceTasks : PartitionsGroupingCalculator.this.numOfBaseSourceTasks + 1;
                }

                public boolean hasNext() {
                    return this.j < PartitionsGroupingCalculator.this.sourceVertexInfo.numTasks || !this.visitedAtLeastOnce;
                }

                /* renamed from: next, reason: merged with bridge method [inline-methods] */
                public DestinationTaskInputsProperty m144next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    this.visitedAtLeastOnce = true;
                    int i = this.j;
                    int numOfSourceTasks = getNumOfSourceTasks();
                    this.j += numOfSourceTasks;
                    return new DestinationTaskInputsProperty(PartitionsGroupingCalculator.this.firstPartitionId, PartitionsGroupingCalculator.this.numOfPartitions, i, numOfSourceTasks);
                }
            };
        }

        public void compute() {
            int i = 0;
            while (hasPartitionsLeft()) {
                if (computeSourceTasksGrouping()) {
                    Iterator<DestinationTaskInputsProperty> it = iterator();
                    while (it.hasNext()) {
                        DestinationTaskInputsProperty next = it.next();
                        this.sourceVertexInfo.getDestinationInputsProperties().put(Integer.valueOf(i), next);
                        i++;
                        FairShuffleVertexManager.LOG.info("Destination Index {}: Input Property {}", Integer.valueOf(i), next);
                    }
                    startNextPartitionsGroup();
                }
            }
        }
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    ShuffleVertexManagerBase.SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty, int i) {
        return new FairSourceVertexInfo(edgeProperty, i);
    }

    public FairShuffleVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public void onVertexStartedCheck() {
        super.onVertexStartedCheck();
        if (this.bipartiteSources > 1 && this.mgrConfig.getFairRoutingType().fairParallelismEnabled()) {
            throw new TezUncheckedException("Having more than one destination task process same partition(s) only works with one bipartite source.");
        }
    }

    static long ceil(long j, long j2) {
        return (j + (j2 - 1)) / j2;
    }

    public long[] estimatePartitionSize() {
        boolean z = false;
        int size = this.pendingTasks.size();
        long[] jArr = new long[size];
        int i = 0;
        while (true) {
            if (i >= size) {
                break;
            }
            if (getCurrentlyKnownStatsAtIndex(i) > 0) {
                z = true;
                break;
            }
            i++;
        }
        if (z) {
            for (int i2 = 0; i2 < size; i2++) {
                jArr[i2] = getExpectedStatsAtIndex(i2);
                LOG.info("Partition index {} with size {}", Integer.valueOf(i2), Long.valueOf(jArr[i2]));
            }
        } else if (size > 0) {
            long longValue = getExpectedTotalBipartiteSourceTasksOutputSize().divide(BigInteger.valueOf(size)).longValue();
            for (int i3 = 0; i3 < size; i3++) {
                jArr[i3] = longValue;
            }
        }
        return jArr;
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public ShuffleVertexManagerBase.ReconfigVertexParams computeRouting() {
        int size = this.pendingTasks.size();
        int i = 0;
        long[] estimatePartitionSize = estimatePartitionSize();
        for (Map.Entry<String, ShuffleVertexManagerBase.SourceVertexInfo> entry : getBipartiteInfo()) {
            FairSourceVertexInfo fairSourceVertexInfo = (FairSourceVertexInfo) entry.getValue();
            computeParallelism(estimatePartitionSize, fairSourceVertexInfo);
            if (i != 0) {
                Preconditions.checkState(i == fairSourceVertexInfo.getDestinationInputsProperties().size(), "the parallelism shall be the same for source vertices");
            }
            i = fairSourceVertexInfo.getDestinationInputsProperties().size();
            FairEdgeConfiguration fairEdgeConfiguration = new FairEdgeConfiguration(size, fairSourceVertexInfo.getDestinationInputsProperties());
            EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(FairShuffleEdgeManager.class.getName());
            create.setUserPayload(fairEdgeConfiguration.getBytePayload());
            entry.getValue().newDescriptor = create;
        }
        return new ShuffleVertexManagerBase.ReconfigVertexParams(i, null);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    void postReconfigVertex() {
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    void processPendingTasks() {
    }

    private void computeParallelism(long[] jArr, FairSourceVertexInfo fairSourceVertexInfo) {
        new PartitionsGroupingCalculator(jArr, fairSourceVertexInfo).compute();
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    List<VertexManagerPluginContext.ScheduleTaskRequest> getTasksToSchedule(TaskAttemptIdentifier taskAttemptIdentifier) {
        int numOfTasksToScheduleAndLog = getNumOfTasksToScheduleAndLog(getMinSourceVertexCompletedTaskFraction());
        if (numOfTasksToScheduleAndLog <= 0) {
            return null;
        }
        boolean z = numOfTasksToScheduleAndLog == this.pendingTasks.size();
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(numOfTasksToScheduleAndLog);
        Iterator<ShuffleVertexManagerBase.PendingTaskInfo> it = this.pendingTasks.iterator();
        FairSourceVertexInfo fairSourceVertexInfo = null;
        int i = 0;
        if (taskAttemptIdentifier != null) {
            i = taskAttemptIdentifier.getTaskIdentifier().getIdentifier();
            fairSourceVertexInfo = (FairSourceVertexInfo) getSourceVertexInfo(taskAttemptIdentifier.getTaskIdentifier().getVertexIdentifier().getName());
        }
        while (it.hasNext() && numOfTasksToScheduleAndLog > 0) {
            Integer valueOf = Integer.valueOf(it.next().getIndex());
            if (z || !this.config.isAutoParallelismEnabled() || fairSourceVertexInfo == null || fairSourceVertexInfo.getDestinationInputsProperties().size() <= 0 || fairSourceVertexInfo.getDestinationInputsProperties().get(valueOf).isSourceTaskInRange(i)) {
                newArrayListWithCapacity.add(VertexManagerPluginContext.ScheduleTaskRequest.create(valueOf.intValue(), (TaskLocationHint) null));
                it.remove();
                numOfTasksToScheduleAndLog--;
            } else {
                LOG.debug("completedSourceTaskIndex {} and taskIndex {} don't connect.", Integer.valueOf(i), valueOf);
            }
        }
        return newArrayListWithCapacity;
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    ShuffleVertexManagerBase.ShuffleVertexManagerBaseConfig initConfiguration() {
        float f = this.conf.getFloat(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
        FairRoutingType fromString = FairRoutingType.fromString(this.conf.get(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT));
        this.mgrConfig = new FairShuffleVertexManagerConfig(fromString.enabled(), this.conf.getLong(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT), f, this.conf.getFloat(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, Math.max(f, 0.75f)), fromString);
        return this.mgrConfig;
    }

    public static FairShuffleVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration configuration) {
        return new FairShuffleVertexManagerConfigBuilder(configuration);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List list) {
        super.onRootVertexInitialized(str, inputDescriptor, list);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        super.onVertexStateUpdated(vertexStateUpdate);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void initialize() {
        super.initialize();
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
        super.onVertexManagerEventReceived(vertexManagerEvent);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
        super.onSourceTaskCompleted(taskAttemptIdentifier);
    }

    @Override // org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase
    public /* bridge */ /* synthetic */ void onVertexStarted(List list) {
        super.onVertexStarted(list);
    }
}
