/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertexManagerWithConcurrentInput
extends VertexManagerPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
    private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
    private int managedTasks;
    private AtomicBoolean tasksScheduled = new AtomicBoolean(false);
    private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
    private Configuration vertexConfig;
    private String vertexName;
    private EdgeProperty.ConcurrentEdgeTriggerType edgeTriggerType;
    private volatile boolean allSrcVerticesConfigured;
    int completedUpstreamTasks;

    public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
        super(context);
    }

    public void initialize() {
        UserPayload userPayload = this.getContext().getUserPayload();
        if (userPayload == null || userPayload.getPayload() == null || userPayload.getPayload().limit() == 0) {
            throw new TezUncheckedException("Could not initialize VertexManagerWithConcurrentInput from provided user payload");
        }
        this.managedTasks = this.getContext().getVertexNumTasks(this.getContext().getVertexName());
        Map edges = this.getContext().getInputVertexEdgeProperties();
        for (Map.Entry entry : edges.entrySet()) {
            if (!EdgeProperty.SchedulingType.CONCURRENT.equals((Object)((EdgeProperty)entry.getValue()).getSchedulingType())) {
                throw new TezUncheckedException("All input edges to vertex " + this.vertexName + "  must be CONCURRENT.");
            }
            String srcVertex = (String)entry.getKey();
            this.srcVerticesConfigured.put(srcVertex, false);
            this.getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
        }
        try {
            this.vertexConfig = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
        }
        catch (IOException e) {
            throw new TezUncheckedException((Throwable)e);
        }
        this.edgeTriggerType = EdgeProperty.ConcurrentEdgeTriggerType.valueOf((String)this.vertexConfig.get("tez.task.concurrent.edge.trigger.type", TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
        if (!EdgeProperty.ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals((Object)this.edgeTriggerType)) {
            throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
        }
        LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", (Object)this.edgeTriggerType);
        this.vertexName = this.getContext().getVertexName();
        this.completedUpstreamTasks = 0;
    }

    public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
        this.onVertexStartedDone.set(true);
        this.scheduleTasks();
    }

    public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
        VertexState state = stateUpdate.getVertexState();
        String fromVertex = stateUpdate.getVertexName();
        if (!this.srcVerticesConfigured.containsKey(fromVertex)) {
            throw new IllegalArgumentException("Not expecting state update from vertex:" + fromVertex + " in vertex: " + this.vertexName);
        }
        if (!VertexState.CONFIGURED.equals((Object)state)) {
            throw new IllegalArgumentException("Received incorrect state notification : " + state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName);
        }
        LOG.info("Received configured notification: " + state + " for vertex: " + fromVertex + " in vertex: " + this.vertexName);
        this.srcVerticesConfigured.put(fromVertex, true);
        boolean checkAllSrcVerticesConfigured = true;
        for (Map.Entry<String, Boolean> entry : this.srcVerticesConfigured.entrySet()) {
            if (entry.getValue().booleanValue()) continue;
            LOG.info("Waiting for vertex {} in vertex {} ", (Object)entry.getKey(), (Object)this.vertexName);
            checkAllSrcVerticesConfigured = false;
            break;
        }
        this.allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
        this.scheduleTasks();
    }

    public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
        ++this.completedUpstreamTasks;
        LOG.info("Source task attempt {} completion received at vertex {}", (Object)attempt, (Object)this.vertexName);
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
    }

    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
    }

    private void scheduleTasks() {
        if (!this.onVertexStartedDone.get()) {
            return;
        }
        if (this.tasksScheduled.get()) {
            return;
        }
        if (!this.canScheduleTasks()) {
            return;
        }
        this.tasksScheduled.compareAndSet(false, true);
        ArrayList tasksToStart = Lists.newArrayListWithCapacity((int)this.managedTasks);
        for (int i = 0; i < this.managedTasks; ++i) {
            tasksToStart.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
        }
        if (!tasksToStart.isEmpty()) {
            LOG.info("Starting {} tasks in {}.", (Object)tasksToStart.size(), (Object)this.vertexName);
            this.getContext().scheduleTasks((List)tasksToStart);
        }
    }

    private boolean canScheduleTasks() {
        if (this.edgeTriggerType.equals((Object)EdgeProperty.ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED)) {
            return this.allSrcVerticesConfigured;
        }
        throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
    }

    public static ConcurrentInputVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration conf) {
        return new ConcurrentInputVertexManagerConfigBuilder(conf);
    }

    public static final class ConcurrentInputVertexManagerConfigBuilder {
        private final Configuration conf;

        private ConcurrentInputVertexManagerConfigBuilder(@Nullable Configuration conf) {
            this.conf = conf == null ? new Configuration(false) : conf;
        }

        public VertexManagerPluginDescriptor build() {
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)VertexManagerWithConcurrentInput.class.getName());
            try {
                return (VertexManagerPluginDescriptor)desc.setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)this.conf));
            }
            catch (IOException e) {
                throw new TezUncheckedException((Throwable)e);
            }
        }
    }
}

