/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.analyzer.plugins;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.plugins.TezAnalyzerBase;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;

public class SlowestVertexAnalyzer
extends TezAnalyzerBase
implements Analyzer {
    private static final String[] headers = new String[]{"vertexName", "taskAttempts", "totalTime", "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom", "75thPercentile", "95thPercentile", "98thPercentile", "Median", "observation", "comments"};
    private final CSVResult csvResult = new CSVResult(headers);
    private final MetricRegistry metrics = new MetricRegistry();
    private Histogram taskAttemptRuntimeHistorgram;
    private static final String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
    private static final long MAX_VERTEX_RUNTIME_DEFAULT = 100000L;
    private final long vertexRuntimeThreshold;

    public SlowestVertexAnalyzer(Configuration config) {
        super(config);
        this.vertexRuntimeThreshold = Math.max(1L, config.getLong(MAX_VERTEX_RUNTIME, 100000L));
    }

    private long getTaskRuntime(VertexInfo vertexInfo) {
        TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
        TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
        DagInfo dagInfo = vertexInfo.getDagInfo();
        long totalTime = (lastTaskToFinish == null ? dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) - (firstTaskToStart == null ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
        return totalTime;
    }

    @Override
    public void analyze(DagInfo dagInfo) throws TezException {
        for (VertexInfo vertexInfo : dagInfo.getVertices()) {
            String vertexName = vertexInfo.getVertexName();
            if (vertexInfo.getFirstTaskToStart() == null || vertexInfo.getLastTaskToFinish() == null) continue;
            long totalTime = this.getTaskRuntime(vertexInfo);
            long slowestLastEventTime = Long.MIN_VALUE;
            String maxSourceName = "";
            this.taskAttemptRuntimeHistorgram = this.metrics.histogram(vertexName);
            for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
                this.taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken());
                Map lastEventReceivedMap = attemptInfo.getCounter(TaskCounter.LAST_EVENT_RECEIVED.toString());
                for (Map.Entry entry : lastEventReceivedMap.entrySet()) {
                    if (((String)entry.getKey()).equals(TaskCounter.class.getName()) || ((TezCounter)entry.getValue()).getValue() <= slowestLastEventTime) continue;
                    slowestLastEventTime = ((TezCounter)entry.getValue()).getValue();
                    maxSourceName = (String)entry.getKey();
                }
            }
            long shuffleMax = Long.MIN_VALUE;
            String shuffleMaxSource = "";
            for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) {
                Map lastEventReceivedMap = taskAttemptInfo.getCounter(TaskCounter.SHUFFLE_PHASE_TIME.toString());
                for (Map.Entry entry : lastEventReceivedMap.entrySet()) {
                    if (((String)entry.getKey()).equals(TaskCounter.class.getName()) || ((TezCounter)entry.getValue()).getValue() <= shuffleMax) continue;
                    shuffleMax = ((TezCounter)entry.getValue()).getValue();
                    shuffleMaxSource = (String)entry.getKey();
                }
            }
            Object comments = "";
            LinkedList linkedList = Lists.newLinkedList();
            linkedList.add(vertexName);
            linkedList.add("" + vertexInfo.getTaskAttempts().size());
            linkedList.add("" + totalTime);
            linkedList.add("" + Math.max(0L, shuffleMax));
            linkedList.add(shuffleMaxSource);
            linkedList.add("" + Math.max(0L, slowestLastEventTime));
            linkedList.add(maxSourceName);
            StringBuilder sb = new StringBuilder();
            double percentile75 = this.taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
            double percentile95 = this.taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile();
            double percentile98 = this.taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile();
            double percentile99 = this.taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile();
            double medianAttemptRuntime = this.taskAttemptRuntimeHistorgram.getSnapshot().getMedian();
            linkedList.add("75th=" + percentile75);
            linkedList.add("95th=" + percentile95);
            linkedList.add("98th=" + percentile98);
            linkedList.add("median=" + medianAttemptRuntime);
            if (percentile75 / percentile99 < 0.5) {
                sb.append("Looks like some straggler task is there");
            }
            linkedList.add(sb.toString());
            if (totalTime > 0L && vertexInfo.getTaskAttempts().size() > 0) {
                if ((double)((float)shuffleMax * 1.0f / (float)totalTime) > 0.5) {
                    comments = (double)((float)slowestLastEventTime * 1.0f / (float)totalTime) > 0.5 ? "This vertex is slow due to its dependency on parent. Got a lot delayed last event received" : "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
                } else if (totalTime > this.vertexRuntimeThreshold) {
                    comments = "Concentrate on this vertex (totalTime > " + this.vertexRuntimeThreshold + " seconds)";
                }
            }
            linkedList.add(comments);
            this.csvResult.addRecord(linkedList.toArray(new String[linkedList.size()]));
        }
    }

    @Override
    public CSVResult getResult() throws TezException {
        return this.csvResult;
    }

    @Override
    public String getName() {
        return "SlowVertexAnalyzer";
    }

    @Override
    public String getDescription() {
        return "Identify the slowest vertex in the DAG, which needs to be looked into first";
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config);
        int res = ToolRunner.run((Configuration)config, (Tool)analyzer, (String[])args);
        analyzer.printResults();
        System.exit(res);
    }
}

