/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DAGSchedulerNaturalOrderControlled
extends DAGScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class);
    private final DAG dag;
    private final EventHandler handler;
    private final ListMultimap<String, TaskAttemptEventSchedule> pendingEvents = LinkedListMultimap.create();
    private final Set<String> scheduledVertices = new HashSet<String>();
    private final Map<String, BitSet> vertexScheduledTasks = new HashMap<String, BitSet>();

    public DAGSchedulerNaturalOrderControlled(DAG dag, EventHandler dispatcher) {
        this.dag = dag;
        this.handler = dispatcher;
    }

    @Override
    public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
        TaskAttempt attempt = event.getAttempt();
        Vertex vertex = this.dag.getVertex(attempt.getVertexID());
        int priorityLowLimit = this.getPriorityLowLimit(this.dag, vertex);
        int priorityHighLimit = this.getPriorityHighLimit(this.dag, vertex);
        TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(attempt.getTaskAttemptID(), priorityLowLimit, priorityHighLimit);
        this.taskAttemptSeen(vertex.getName(), attempt.getTaskAttemptID());
        if (this.vertexAlreadyScheduled(vertex)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Scheduling " + attempt.getTaskAttemptID() + " between priorityLow: " + priorityLowLimit + " and priorityHigh: " + priorityHighLimit);
            }
            this.sendEvent(attemptEvent);
            this.processDownstreamVertices(vertex);
        } else {
            boolean scheduled;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Attempting to schedule vertex: " + vertex.getLogIdentifier() + " due to schedule event");
            }
            if (scheduled = this.trySchedulingVertex(vertex)) {
                LOG.info("Scheduled vertex: " + vertex.getLogIdentifier());
                this.sendEventsForVertex(vertex.getName());
                this.sendEvent(attemptEvent);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Processing downstream vertices for vertex: " + vertex.getLogIdentifier());
                }
                this.processDownstreamVertices(vertex);
            } else {
                this.pendingEvents.put((Object)vertex.getName(), (Object)attemptEvent);
            }
        }
    }

    private void taskAttemptSeen(String vertexName, TezTaskAttemptID taskAttemptID) {
        BitSet scheduledTasks = this.vertexScheduledTasks.get(vertexName);
        if (scheduledTasks == null) {
            scheduledTasks = new BitSet();
            this.vertexScheduledTasks.put(vertexName, scheduledTasks);
        }
        if (taskAttemptID != null) {
            scheduledTasks.set(taskAttemptID.getTaskID().getId());
        }
    }

    private void sendEventsForVertex(String vertexName) {
        for (TaskAttemptEventSchedule event : this.pendingEvents.removeAll((Object)vertexName)) {
            this.sendEvent(event);
        }
    }

    private boolean vertexAlreadyScheduled(Vertex vertex) {
        return this.scheduledVertices.contains(vertex.getName());
    }

    private boolean scheduledTasksForwarded(Vertex vertex) {
        boolean canSchedule = false;
        BitSet scheduledTasks = this.vertexScheduledTasks.get(vertex.getName());
        if (scheduledTasks != null && scheduledTasks.cardinality() >= vertex.getTotalTasks()) {
            canSchedule = true;
        }
        return canSchedule;
    }

    private void processDownstreamVertices(Vertex vertex) {
        LinkedList newlyScheduledVertices = Lists.newLinkedList();
        Map<Vertex, Edge> outputVertexEdgeMap = vertex.getOutputVertices();
        for (Vertex destVertex : outputVertexEdgeMap.keySet()) {
            boolean scheduled;
            if (this.vertexAlreadyScheduled(destVertex)) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Attempting to schedule vertex: " + destVertex.getLogIdentifier() + " due to upstream event from " + vertex.getLogIdentifier());
            }
            if (!(scheduled = this.trySchedulingVertex(destVertex))) continue;
            LOG.info("Scheduled vertex: " + destVertex.getLogIdentifier() + " due to upstream event from " + vertex.getLogIdentifier());
            this.sendEventsForVertex(destVertex.getName());
            newlyScheduledVertices.add(destVertex);
        }
        for (Vertex downStreamVertex : newlyScheduledVertices) {
            this.processDownstreamVertices(downStreamVertex);
        }
    }

    private boolean trySchedulingVertex(Vertex vertex) {
        boolean canSchedule;
        block7: {
            Map<Vertex, Edge> inputVertexEdgeMap;
            block8: {
                block6: {
                    canSchedule = true;
                    if (this.vertexScheduledTasks.get(vertex.getName()) != null) break block6;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("No schedule requests for vertex: " + vertex.getLogIdentifier() + ", Not scheduling");
                    }
                    canSchedule = false;
                    break block7;
                }
                inputVertexEdgeMap = vertex.getInputVertices();
                if (inputVertexEdgeMap != null && !inputVertexEdgeMap.isEmpty()) break block8;
                if (!LOG.isDebugEnabled()) break block7;
                LOG.debug("No sources for vertex: " + vertex.getLogIdentifier() + ", Scheduling now");
                break block7;
            }
            for (Vertex srcVertex : inputVertexEdgeMap.keySet()) {
                if (this.scheduledTasksForwarded(srcVertex)) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Trying to schedule: " + vertex.getLogIdentifier() + ", All tasks forwarded for srcVertex: " + srcVertex.getLogIdentifier() + ", count: " + srcVertex.getTotalTasks());
                    continue;
                }
                if (srcVertex.getTotalTasks() == 0) {
                    LOG.info("Vertex: " + srcVertex.getLogIdentifier() + " has 0 tasks. Marking as scheduled");
                    this.scheduledVertices.add(srcVertex.getName());
                    this.taskAttemptSeen(srcVertex.getName(), null);
                    continue;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Not all sources schedule requests complete while trying to schedule: " + vertex.getLogIdentifier() + ", For source vertex: " + srcVertex.getLogIdentifier() + ", Forwarded requests: " + (Serializable)(this.vertexScheduledTasks.get(srcVertex.getName()) == null ? "null(0)" : Integer.valueOf(this.vertexScheduledTasks.get(srcVertex.getName()).cardinality())) + " out of " + srcVertex.getTotalTasks());
                }
                canSchedule = false;
                break;
            }
        }
        if (canSchedule) {
            this.scheduledVertices.add(vertex.getName());
        }
        return canSchedule;
    }

    @Override
    public void taskCompletedEx(DAGEventSchedulerUpdate event) {
    }

    private void sendEvent(TaskAttemptEventSchedule event) {
        this.handler.handle((Event)event);
    }
}

