package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.class */
public class ShuffleVertexManager implements VertexManagerPlugin {
    private static final String TEZ_AM_PREFIX = "tez.am.";
    public static final String TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.am.shuffle-vertex-manager.min-src-fraction";
    public static final float TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.am.shuffle-vertex-manager.max-src-fraction";
    public static final float TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    public static final String TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = "tez.am.shuffle-vertex-manager.enable.auto-parallel";
    public static final boolean TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
    public static final String TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = "tez.am.shuffle-vertex-manager.desired-task-input-size";
    public static final long TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 104857600;
    public static final String TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = "tez.am.shuffle-vertex-manager.min-task-parallelism";
    public static final int TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
    private static final Log LOG = LogFactory.getLog(ShuffleVertexManager.class);
    VertexManagerPluginContext context;
    float slowStartMinSrcCompletionFraction;
    float slowStartMaxSrcCompletionFraction;
    ArrayList<Integer> pendingTasks;
    long desiredTaskInputDataSize = TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT;
    int minTaskParallelism = 1;
    boolean enableAutoParallelism = false;
    boolean parallelismDetermined = false;
    int numSourceTasks = 0;
    int numSourceTasksCompleted = 0;
    int numVertexManagerEventsReceived = 0;
    int totalTasksToSchedule = 0;
    Map<String, Set<Integer>> bipartiteSources = Maps.newHashMap();
    long completedSourceTasksOutputSize = 0;

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$CustomShuffleEdgeManager.class */
    public static class CustomShuffleEdgeManager implements EdgeManager {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;

        public void initialize(EdgeManagerContext edgeManagerContext) {
            byte[] userPayload = edgeManagerContext.getUserPayload();
            if (userPayload == null || userPayload.length == 0) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload");
            }
            try {
                CustomShuffleEdgeManagerConfig fromUserPayload = CustomShuffleEdgeManagerConfig.fromUserPayload(userPayload);
                this.numSourceTaskOutputs = fromUserPayload.numSourceTaskOutputs;
                this.numDestinationTasks = fromUserPayload.numDestinationTasks;
                this.basePartitionRange = fromUserPayload.basePartitionRange;
                this.remainderRangeForLastShuffler = fromUserPayload.remainderRangeForLastShuffler;
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("Could not initialize CustomShuffleEdgeManager from provided user payload", e);
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int i, int i2) {
            return i * (i2 < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler);
        }

        public int getNumSourceTaskPhysicalOutputs(int i, int i2) {
            return this.numSourceTaskOutputs;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent dataMovementEvent, int i, int i2, Map<Integer, List<Integer>> map) {
            int sourceIndex = dataMovementEvent.getSourceIndex();
            int i3 = sourceIndex / this.basePartitionRange;
            int i4 = i3 < i2 - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler;
            map.put(new Integer((i * i4) + (sourceIndex % i4)), Collections.singletonList(new Integer(i3)));
        }

        public void routeInputSourceTaskFailedEventToDestination(int i, int i2, Map<Integer, List<Integer>> map) {
            if (this.remainderRangeForLastShuffler >= this.basePartitionRange) {
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i2);
                for (int i3 = 0; i3 < i2; i3++) {
                    newArrayListWithCapacity.add(new Integer(i3));
                }
                int i4 = i * this.basePartitionRange;
                for (int i5 = 0; i5 < this.basePartitionRange; i5++) {
                    map.put(new Integer(i4 + i5), newArrayListWithCapacity);
                }
                return;
            }
            List<Integer> singletonList = Collections.singletonList(new Integer(i2 - 1));
            ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(i2 - 1);
            for (int i6 = 0; i6 < i2 - 1; i6++) {
                newArrayListWithCapacity2.add(new Integer(i6));
            }
            int i7 = i * this.basePartitionRange;
            for (int i8 = 0; i8 < this.basePartitionRange; i8++) {
                map.put(new Integer(i7 + i8), newArrayListWithCapacity2);
            }
            int i9 = i * this.remainderRangeForLastShuffler;
            for (int i10 = 0; i10 < this.remainderRangeForLastShuffler; i10++) {
                map.put(new Integer(i9 + i10), singletonList);
            }
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent inputReadErrorEvent, int i) {
            return inputReadErrorEvent.getIndex() / (i < this.numDestinationTasks - 1 ? this.basePartitionRange : this.remainderRangeForLastShuffler);
        }

        public int getNumDestinationConsumerTasks(int i, int i2) {
            return i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager$CustomShuffleEdgeManagerConfig.class */
    public static class CustomShuffleEdgeManagerConfig {
        int numSourceTaskOutputs;
        int numDestinationTasks;
        int basePartitionRange;
        int remainderRangeForLastShuffler;

        private CustomShuffleEdgeManagerConfig(int i, int i2, int i3, int i4) {
            this.numSourceTaskOutputs = i;
            this.numDestinationTasks = i2;
            this.basePartitionRange = i3;
            this.remainderRangeForLastShuffler = i4;
        }

        public byte[] toUserPayload() {
            return ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.newBuilder().setNumSourceTaskOutputs(this.numSourceTaskOutputs).setNumDestinationTasks(this.numDestinationTasks).setBasePartitionRange(this.basePartitionRange).setRemainderRangeForLastShuffler(this.remainderRangeForLastShuffler).m167build().toByteArray();
        }

        public static CustomShuffleEdgeManagerConfig fromUserPayload(byte[] bArr) throws InvalidProtocolBufferException {
            ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto parseFrom = ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto.parseFrom(bArr);
            return new CustomShuffleEdgeManagerConfig(parseFrom.getNumSourceTaskOutputs(), parseFrom.getNumDestinationTasks(), parseFrom.getBasePartitionRange(), parseFrom.getRemainderRangeForLastShuffler());
        }
    }

    public void onVertexStarted(Map<String, List<Integer>> map) {
        this.pendingTasks = new ArrayList<>(this.context.getVertexNumTasks(this.context.getVertexName()));
        updatePendingTasks();
        updateSourceTaskCount();
        LOG.info("OnVertexStarted vertex: " + this.context.getVertexName() + " with " + this.numSourceTasks + " source tasks and " + this.totalTasksToSchedule + " pending tasks");
        if (map != null) {
            for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
                Iterator<Integer> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    onSourceTaskCompleted(entry.getKey(), it.next());
                }
            }
        }
        schedulePendingTasks();
    }

    public void onSourceTaskCompleted(String str, Integer num) {
        updateSourceTaskCount();
        Set<Integer> set = this.bipartiteSources.get(str);
        if (set != null) {
            if (set.add(num)) {
                this.numSourceTasksCompleted++;
            }
            schedulePendingTasks();
        }
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
        if (this.enableAutoParallelism) {
            try {
                long outputSize = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(vertexManagerEvent.getUserPayload()).getOutputSize();
                this.numVertexManagerEventsReceived++;
                this.completedSourceTasksOutputSize += outputSize;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received info of output size: " + outputSize + " numInfoReceived: " + this.numVertexManagerEventsReceived + " total output size: " + this.completedSourceTasksOutputSize);
                }
            } catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    void updatePendingTasks() {
        this.pendingTasks.clear();
        for (int i = 0; i < this.context.getVertexNumTasks(this.context.getVertexName()); i++) {
            this.pendingTasks.add(new Integer(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    void updateSourceTaskCount() {
        int i = 0;
        Iterator<String> it = this.bipartiteSources.keySet().iterator();
        while (it.hasNext()) {
            i += this.context.getVertexNumTasks(it.next());
        }
        this.numSourceTasks = i;
    }

    void determineParallelismAndApply() {
        if (this.numSourceTasksCompleted == 0 || this.numVertexManagerEventsReceived == 0) {
            return;
        }
        int size = this.pendingTasks.size();
        long j = (this.numSourceTasks * this.completedSourceTasksOutputSize) / this.numVertexManagerEventsReceived;
        int i = (int) (((j + this.desiredTaskInputDataSize) - 1) / this.desiredTaskInputDataSize);
        if (i < this.minTaskParallelism) {
            i = this.minTaskParallelism;
        }
        if (i >= size) {
            return;
        }
        int i2 = size / i;
        if (i2 <= 1) {
            return;
        }
        int i3 = size / i2;
        int i4 = size % i2;
        int i5 = i4 > 0 ? i3 + 1 : i3;
        LOG.info("Reduce auto parallelism for vertex: " + this.context.getVertexName() + " to " + i5 + " from " + this.pendingTasks.size() + " . Expected output: " + j + " based on actual output: " + this.completedSourceTasksOutputSize + " from " + this.numVertexManagerEventsReceived + " vertex manager events.  desiredTaskInputSize: " + this.desiredTaskInputDataSize);
        if (i5 < size) {
            HashMap hashMap = new HashMap(this.bipartiteSources.size());
            for (String str : this.bipartiteSources.keySet()) {
                CustomShuffleEdgeManagerConfig customShuffleEdgeManagerConfig = new CustomShuffleEdgeManagerConfig(size, i5, i2, i4 > 0 ? i4 : i2);
                EdgeManagerDescriptor edgeManagerDescriptor = new EdgeManagerDescriptor(CustomShuffleEdgeManager.class.getName());
                edgeManagerDescriptor.setUserPayload(customShuffleEdgeManagerConfig.toUserPayload());
                hashMap.put(str, edgeManagerDescriptor);
            }
            this.context.setVertexParallelism(i5, (VertexLocationHint) null, hashMap);
            updatePendingTasks();
        }
    }

    void schedulePendingTasks(int i) {
        if (this.enableAutoParallelism && !this.parallelismDetermined) {
            this.parallelismDetermined = true;
            determineParallelismAndApply();
        }
        ArrayList arrayList = new ArrayList(i);
        while (!this.pendingTasks.isEmpty() && i > 0) {
            i--;
            arrayList.add(this.pendingTasks.get(0));
            this.pendingTasks.remove(0);
        }
        this.context.scheduleVertexTasks(arrayList);
    }

    void schedulePendingTasks() {
        int size = this.pendingTasks.size();
        if (size == 0) {
            return;
        }
        if (this.numSourceTasksCompleted == this.numSourceTasks && size > 0) {
            LOG.info("All source tasks assigned. Ramping up " + size + " remaining tasks for vertex: " + this.context.getVertexName());
            schedulePendingTasks(size);
            return;
        }
        float f = this.numSourceTasks != 0 ? this.numSourceTasksCompleted / this.numSourceTasks : 1.0f;
        float f2 = 1.0f;
        float f3 = this.slowStartMaxSrcCompletionFraction - this.slowStartMinSrcCompletionFraction;
        if (f3 > 0.0f) {
            f2 = (f - this.slowStartMinSrcCompletionFraction) / f3;
        } else if (f < this.slowStartMinSrcCompletionFraction) {
            f2 = 0.0f;
        }
        if (f2 > 1.0f) {
            f2 = 1.0f;
        } else if (f2 < 0.0f) {
            f2 = 0.0f;
        }
        int i = ((int) (f2 * this.totalTasksToSchedule)) - (this.totalTasksToSchedule - size);
        if (i > 0) {
            LOG.info("Scheduling " + i + " tasks for vertex: " + this.context.getVertexName() + " with totalTasks: " + this.totalTasksToSchedule + ". " + this.numSourceTasksCompleted + " source tasks completed out of " + this.numSourceTasks + ". SourceTaskCompletedFraction: " + f + " min: " + this.slowStartMinSrcCompletionFraction + " max: " + this.slowStartMaxSrcCompletionFraction);
            schedulePendingTasks(i);
        }
    }

    public void initialize(VertexManagerPluginContext vertexManagerPluginContext) {
        try {
            Configuration createConfFromUserPayload = TezUtils.createConfFromUserPayload(vertexManagerPluginContext.getUserPayload());
            this.context = vertexManagerPluginContext;
            this.slowStartMinSrcCompletionFraction = createConfFromUserPayload.getFloat(TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
            this.slowStartMaxSrcCompletionFraction = createConfFromUserPayload.getFloat(TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0.75f);
            if (this.slowStartMinSrcCompletionFraction < 0.0f || this.slowStartMaxSrcCompletionFraction < this.slowStartMinSrcCompletionFraction) {
                throw new IllegalArgumentException("Invalid values for slowStartMinSrcCompletionFraction/slowStartMaxSrcCompletionFraction. Min cannot be < 0 and max cannot be < min.");
            }
            this.enableAutoParallelism = createConfFromUserPayload.getBoolean(TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, false);
            this.desiredTaskInputDataSize = createConfFromUserPayload.getLong(TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
            this.minTaskParallelism = createConfFromUserPayload.getInt(TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, 1);
            LOG.info("Shuffle Vertex Manager: settings minFrac:" + this.slowStartMinSrcCompletionFraction + " maxFrac:" + this.slowStartMaxSrcCompletionFraction + " auto:" + this.enableAutoParallelism + " desiredTaskIput:" + this.desiredTaskInputDataSize + " minTasks:" + this.minTaskParallelism);
            for (Map.Entry entry : vertexManagerPluginContext.getInputVertexEdgeProperties().entrySet()) {
                if (((EdgeProperty) entry.getValue()).getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                    this.bipartiteSources.put((String) entry.getKey(), new HashSet());
                }
            }
            if (this.bipartiteSources.isEmpty()) {
                throw new TezUncheckedException("Atleast 1 bipartite source should exist");
            }
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
    }
}
