package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.class */
public class DAGSchedulerNaturalOrderControlled extends DAGScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class);
    private final DAG dag;
    private final EventHandler handler;
    private final ListMultimap<String, TaskAttemptEventSchedule> pendingEvents = LinkedListMultimap.create();
    private final Set<String> scheduledVertices = new HashSet();
    private final Map<String, BitSet> vertexScheduledTasks = new HashMap();

    public DAGSchedulerNaturalOrderControlled(DAG dag, EventHandler eventHandler) {
        this.dag = dag;
        this.handler = eventHandler;
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void scheduleTaskEx(DAGEventSchedulerUpdate dAGEventSchedulerUpdate) {
        TaskAttempt attempt = dAGEventSchedulerUpdate.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        int priorityLowLimit = getPriorityLowLimit(this.dag, vertex);
        int priorityHighLimit = getPriorityHighLimit(this.dag, vertex);
        TaskAttemptEventSchedule taskAttemptEventSchedule = new TaskAttemptEventSchedule(attempt.getTaskAttemptID(), priorityLowLimit, priorityHighLimit);
        taskAttemptSeen(vertex.getName(), attempt.getTaskAttemptID());
        if (vertexAlreadyScheduled(vertex)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Scheduling " + attempt.getTaskAttemptID() + " between priorityLow: " + priorityLowLimit + " and priorityHigh: " + priorityHighLimit);
            }
            sendEvent(taskAttemptEventSchedule);
            processDownstreamVertices(vertex);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Attempting to schedule vertex: " + vertex.getLogIdentifier() + " due to schedule event");
        }
        if (!trySchedulingVertex(vertex)) {
            this.pendingEvents.put(vertex.getName(), taskAttemptEventSchedule);
            return;
        }
        LOG.info("Scheduled vertex: " + vertex.getLogIdentifier());
        sendEventsForVertex(vertex.getName());
        sendEvent(taskAttemptEventSchedule);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing downstream vertices for vertex: " + vertex.getLogIdentifier());
        }
        processDownstreamVertices(vertex);
    }

    private void taskAttemptSeen(String str, TezTaskAttemptID tezTaskAttemptID) {
        BitSet bitSet = this.vertexScheduledTasks.get(str);
        if (bitSet == null) {
            bitSet = new BitSet();
            this.vertexScheduledTasks.put(str, bitSet);
        }
        if (tezTaskAttemptID != null) {
            bitSet.set(tezTaskAttemptID.getTaskID().getId());
        }
    }

    private void sendEventsForVertex(String str) {
        Iterator it = this.pendingEvents.removeAll(str).iterator();
        while (it.hasNext()) {
            sendEvent((TaskAttemptEventSchedule) it.next());
        }
    }

    private boolean vertexAlreadyScheduled(Vertex vertex) {
        return this.scheduledVertices.contains(vertex.getName());
    }

    private boolean scheduledTasksForwarded(Vertex vertex) {
        boolean z = false;
        BitSet bitSet = this.vertexScheduledTasks.get(vertex.getName());
        if (bitSet != null && bitSet.cardinality() >= vertex.getTotalTasks()) {
            z = true;
        }
        return z;
    }

    private void processDownstreamVertices(Vertex vertex) {
        LinkedList newLinkedList = Lists.newLinkedList();
        for (Vertex vertex2 : vertex.getOutputVertices().keySet()) {
            if (!vertexAlreadyScheduled(vertex2)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Attempting to schedule vertex: " + vertex2.getLogIdentifier() + " due to upstream event from " + vertex.getLogIdentifier());
                }
                if (trySchedulingVertex(vertex2)) {
                    LOG.info("Scheduled vertex: " + vertex2.getLogIdentifier() + " due to upstream event from " + vertex.getLogIdentifier());
                    sendEventsForVertex(vertex2.getName());
                    newLinkedList.add(vertex2);
                }
            }
        }
        Iterator it = newLinkedList.iterator();
        while (it.hasNext()) {
            processDownstreamVertices((Vertex) it.next());
        }
    }

    private boolean trySchedulingVertex(Vertex vertex) {
        boolean z = true;
        if (this.vertexScheduledTasks.get(vertex.getName()) != null) {
            Map<Vertex, Edge> inputVertices = vertex.getInputVertices();
            if (inputVertices != null && !inputVertices.isEmpty()) {
                Iterator<Vertex> it = inputVertices.keySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Vertex next = it.next();
                    if (scheduledTasksForwarded(next)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Trying to schedule: " + vertex.getLogIdentifier() + ", All tasks forwarded for srcVertex: " + next.getLogIdentifier() + ", count: " + next.getTotalTasks());
                        }
                    } else if (next.getTotalTasks() == 0) {
                        LOG.info("Vertex: " + next.getLogIdentifier() + " has 0 tasks. Marking as scheduled");
                        this.scheduledVertices.add(next.getName());
                        taskAttemptSeen(next.getName(), null);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Not all sources schedule requests complete while trying to schedule: " + vertex.getLogIdentifier() + ", For source vertex: " + next.getLogIdentifier() + ", Forwarded requests: " + (this.vertexScheduledTasks.get(next.getName()) == null ? "null(0)" : Integer.valueOf(this.vertexScheduledTasks.get(next.getName()).cardinality())) + " out of " + next.getTotalTasks());
                        }
                        z = false;
                    }
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("No sources for vertex: " + vertex.getLogIdentifier() + ", Scheduling now");
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No schedule requests for vertex: " + vertex.getLogIdentifier() + ", Not scheduling");
            }
            z = false;
        }
        if (z) {
            this.scheduledVertices.add(vertex.getName());
        }
        return z;
    }

    @Override // org.apache.tez.dag.app.dag.DAGScheduler
    public void taskCompletedEx(DAGEventSchedulerUpdate dAGEventSchedulerUpdate) {
    }

    private void sendEvent(TaskAttemptEventSchedule taskAttemptEventSchedule) {
        this.handler.handle(taskAttemptEventSchedule);
    }
}
