/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.sls;

import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NMRunner {
    private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class);
    private int numNMs;
    private int numRacks;
    private Map<NodeId, NMSimulator> nmMap;
    private Resource nodeManagerResource;
    private String nodeFile;
    private TaskRunner taskRunner;
    private Configuration conf;
    private ResourceManager rm;
    private String tableMapping;
    private int threadPoolSize;
    private SLSRunner.TraceType inputType;
    private String[] inputTraces;
    private SynthTraceJobProducer stjp;

    public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int threadPoolSize) {
        this.taskRunner = taskRunner;
        this.conf = conf;
        this.rm = rm;
        this.tableMapping = tableMapping;
        this.threadPoolSize = threadPoolSize;
        this.nmMap = new ConcurrentHashMap<NodeId, NMSimulator>();
        this.nodeManagerResource = this.getNodeManagerResourceFromConf();
    }

    public void startNM() throws YarnException, IOException, InterruptedException {
        final int heartbeatInterval = this.conf.getInt("yarn.sls.nm.heartbeat.interval.ms", 1000);
        final float resourceUtilizationRatio = this.conf.getFloat("yarn.sls.nm.resource.utilization.ratio", -1.0f);
        Set<SLSRunner.NodeDetails> nodeSet = null;
        if (this.nodeFile.isEmpty()) {
            block5: for (String inputTrace : this.inputTraces) {
                switch (this.inputType) {
                    case SLS: {
                        nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
                        continue block5;
                    }
                    case RUMEN: {
                        nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
                        continue block5;
                    }
                    case SYNTH: {
                        this.stjp = new SynthTraceJobProducer(this.conf, new Path(this.inputTraces[0]));
                        nodeSet = SLSUtils.generateNodes(this.stjp.getNumNodes(), this.stjp.getNumNodes() / this.stjp.getNodesPerRack());
                        continue block5;
                    }
                    default: {
                        throw new YarnException("Input configuration not recognized, trace type should be SLS, RUMEN, or SYNTH");
                    }
                }
            }
        } else {
            nodeSet = SLSUtils.parseNodesFromNodeFile(this.nodeFile, this.nodeManagerResource);
        }
        if (nodeSet == null || nodeSet.isEmpty()) {
            throw new YarnException("No node! Please configure nodes.");
        }
        SLSUtils.generateNodeTableMapping(nodeSet, this.tableMapping);
        final Random random = new Random();
        final ConcurrentHashMap.KeySetView rackSet = ConcurrentHashMap.newKeySet();
        int threadPoolSize = Math.max(this.threadPoolSize, 10);
        ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
        for (final SLSRunner.NodeDetails nodeDetails : nodeSet) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        NMSimulator nm = new NMSimulator();
                        Resource nmResource = NMRunner.this.nodeManagerResource;
                        String hostName = nodeDetails.getHostname();
                        if (nodeDetails.getNodeResource() != null) {
                            nmResource = nodeDetails.getNodeResource();
                        }
                        Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
                        nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), heartbeatInterval, NMRunner.this.rm, resourceUtilizationRatio, nodeLabels);
                        NMRunner.this.nmMap.put(nm.getNode().getNodeID(), nm);
                        NMRunner.this.taskRunner.schedule(nm);
                        rackSet.add(nm.getNode().getRackName());
                    }
                    catch (IOException | YarnException e) {
                        LOG.error("Got an error while adding node", e);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(10L, TimeUnit.MINUTES);
        this.numRacks = rackSet.size();
        this.numNMs = this.nmMap.size();
    }

    void waitForNodesRunning() throws InterruptedException {
        long startTimeMS = System.currentTimeMillis();
        while (true) {
            int numRunningNodes = 0;
            for (RMNode node : this.rm.getRMContext().getRMNodes().values()) {
                if (node.getState() != NodeState.RUNNING) continue;
                ++numRunningNodes;
            }
            if (numRunningNodes == this.numNMs) break;
            LOG.info("SLSRunner is waiting for all nodes RUNNING. {} of {} NMs initialized.", (Object)numRunningNodes, (Object)this.numNMs);
            Thread.sleep(1000L);
        }
        LOG.info("SLSRunner takes {} ms to launch all nodes.", (Object)(System.currentTimeMillis() - startTimeMS));
    }

    private Resource getNodeManagerResourceFromConf() {
        ResourceInformation[] infors;
        Resource resource = Resources.createResource((int)0);
        for (ResourceInformation info : infors = ResourceUtils.getResourceTypesArray()) {
            long value = info.getName().equals("memory-mb") ? (long)this.conf.getInt("yarn.sls.nm.memory.mb", 10240) : (info.getName().equals("vcores") ? (long)this.conf.getInt("yarn.sls.nm.vcores", 10) : this.conf.getLong("yarn.sls.nm." + info.getName(), 0L));
            resource.setResourceValue(info.getName(), value);
        }
        return resource;
    }

    public void setNodeFile(String nodeFile) {
        this.nodeFile = nodeFile;
    }

    public void setInputType(SLSRunner.TraceType inputType) {
        this.inputType = inputType;
    }

    public void setInputTraces(String[] inputTraces) {
        this.inputTraces = (String[])inputTraces.clone();
    }

    public int getNumNMs() {
        return this.numNMs;
    }

    public int getNumRacks() {
        return this.numRacks;
    }

    public Resource getNodeManagerResource() {
        return this.nodeManagerResource;
    }

    public Map<NodeId, NMSimulator> getNmMap() {
        return this.nmMap;
    }

    public SynthTraceJobProducer getStjp() {
        return this.stjp;
    }

    public void setTableMapping(String tableMapping) {
        this.tableMapping = tableMapping;
    }

    public void setRm(ResourceManager rm) {
        this.rm = rm;
    }
}

