package org.apache.hadoop.mapreduce.v2.app.speculate;

import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;

/* loaded from: input_file:hadoop-client-2.7.0-mapr-1803-r1/share/hadoop/client/lib/hadoop-mapreduce-client-app-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.class */
public class DefaultSpeculator extends AbstractService implements Speculator {
    private static final long ON_SCHEDULE = Long.MIN_VALUE;
    private static final long ALREADY_SPECULATING = -9223372036854775807L;
    private static final long TOO_NEW = -9223372036854775806L;
    private static final long PROGRESS_IS_GOOD = -9223372036854775805L;
    private static final long NOT_RUNNING = -9223372036854775804L;
    private static final long TOO_LATE_TO_SPECULATE = -9223372036854775803L;
    private long soonestRetryAfterNoSpeculate;
    private long soonestRetryAfterSpeculate;
    private double proportionRunningTasksSpeculatable;
    private double proportionTotalTasksSpeculatable;
    private int minimumAllowedSpeculativeTasks;
    private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
    private final ConcurrentMap<TaskId, Boolean> runningTasks;
    private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics> runningTaskAttemptStatistics;
    private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9000;
    private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds;
    private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds;
    private final Set<TaskId> mayHaveSpeculated;
    private final Configuration conf;
    private AppContext context;
    private Thread speculationBackgroundThread;
    private volatile boolean stopped;
    private BlockingQueue<SpeculatorEvent> eventQueue;
    private TaskRuntimeEstimator estimator;
    private BlockingQueue<Object> scanControl;
    private final Clock clock;
    private final EventHandler<TaskEvent> eventHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:hadoop-client-2.7.0-mapr-1803-r1/share/hadoop/client/lib/hadoop-mapreduce-client-app-2.7.0-mapr-1803-r1.jar:org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator$TaskAttemptHistoryStatistics.class */
    public static class TaskAttemptHistoryStatistics {
        private long estimatedRunTime;
        private float progress;
        private long lastHeartBeatTime;

        public TaskAttemptHistoryStatistics(long j, float f, long j2) {
            this.estimatedRunTime = j;
            this.progress = f;
            resetHeartBeatTime(j2);
        }

        public long getEstimatedRunTime() {
            return this.estimatedRunTime;
        }

        public float getProgress() {
            return this.progress;
        }

        public void setEstimatedRunTime(long j) {
            this.estimatedRunTime = j;
        }

        public void setProgress(float f) {
            this.progress = f;
        }

        public boolean notHeartbeatedInAWhile(long j) {
            if (j - this.lastHeartBeatTime <= DefaultSpeculator.MAX_WAITTING_TIME_FOR_HEARTBEAT) {
                return false;
            }
            resetHeartBeatTime(j);
            return true;
        }

        public void resetHeartBeatTime(long j) {
            this.lastHeartBeatTime = j;
        }
    }

    public DefaultSpeculator(Configuration configuration, AppContext appContext) {
        this(configuration, appContext, appContext.getClock());
    }

    public DefaultSpeculator(Configuration configuration, AppContext appContext, Clock clock) {
        this(configuration, appContext, getEstimator(configuration, appContext), clock);
    }

    private static TaskRuntimeEstimator getEstimator(Configuration configuration, AppContext appContext) {
        try {
            TaskRuntimeEstimator taskRuntimeEstimator = (TaskRuntimeEstimator) configuration.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, LegacyTaskRuntimeEstimator.class, TaskRuntimeEstimator.class).getConstructor(new Class[0]).newInstance(new Object[0]);
            taskRuntimeEstimator.contextualize(configuration, appContext);
            return taskRuntimeEstimator;
        } catch (IllegalAccessException e) {
            LOG.error("Can't make a speculation runtime estimator", e);
            throw new YarnRuntimeException(e);
        } catch (InstantiationException e2) {
            LOG.error("Can't make a speculation runtime estimator", e2);
            throw new YarnRuntimeException(e2);
        } catch (NoSuchMethodException e3) {
            LOG.error("Can't make a speculation runtime estimator", e3);
            throw new YarnRuntimeException(e3);
        } catch (InvocationTargetException e4) {
            LOG.error("Can't make a speculation runtime estimator", e4);
            throw new YarnRuntimeException(e4);
        }
    }

    public DefaultSpeculator(Configuration configuration, AppContext appContext, TaskRuntimeEstimator taskRuntimeEstimator, Clock clock) {
        super(DefaultSpeculator.class.getName());
        this.runningTasks = new ConcurrentHashMap();
        this.runningTaskAttemptStatistics = new ConcurrentHashMap();
        this.mapContainerNeeds = new ConcurrentHashMap();
        this.reduceContainerNeeds = new ConcurrentHashMap();
        this.mayHaveSpeculated = new HashSet();
        this.speculationBackgroundThread = null;
        this.stopped = false;
        this.eventQueue = new LinkedBlockingQueue();
        this.scanControl = new LinkedBlockingQueue();
        this.conf = configuration;
        this.context = appContext;
        this.estimator = taskRuntimeEstimator;
        this.clock = clock;
        this.eventHandler = appContext.getEventHandler();
        this.soonestRetryAfterNoSpeculate = configuration.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 1000L);
        this.soonestRetryAfterSpeculate = configuration.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 15000L);
        this.proportionRunningTasksSpeculatable = configuration.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1d);
        this.proportionTotalTasksSpeculatable = configuration.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.01d);
        this.minimumAllowedSpeculativeTasks = configuration.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 10);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        this.speculationBackgroundThread = new Thread(new Runnable() { // from class: org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator.1
            @Override // java.lang.Runnable
            public void run() {
                while (!DefaultSpeculator.this.stopped && !Thread.currentThread().isInterrupted()) {
                    long time = DefaultSpeculator.this.clock.getTime();
                    try {
                        int computeSpeculations = DefaultSpeculator.this.computeSpeculations();
                        long max = Math.max(computeSpeculations > 0 ? DefaultSpeculator.this.soonestRetryAfterSpeculate : DefaultSpeculator.this.soonestRetryAfterNoSpeculate, DefaultSpeculator.this.clock.getTime() - time);
                        if (computeSpeculations > 0) {
                            DefaultSpeculator.LOG.info("We launched " + computeSpeculations + " speculations.  Sleeping " + max + " milliseconds.");
                        }
                        DefaultSpeculator.this.scanControl.poll(max, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (DefaultSpeculator.this.stopped) {
                            return;
                        }
                        DefaultSpeculator.LOG.error("Background thread returning, interrupted", e);
                        return;
                    }
                }
            }
        }, "DefaultSpeculator background processing");
        this.speculationBackgroundThread.start();
        super.serviceStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        this.stopped = true;
        if (this.speculationBackgroundThread != null) {
            this.speculationBackgroundThread.interrupt();
        }
        super.serviceStop();
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.speculate.Speculator
    public void handleAttempt(TaskAttemptStatusUpdateEvent.TaskAttemptStatus taskAttemptStatus) {
        statusUpdate(taskAttemptStatus, this.clock.getTime());
    }

    public boolean eventQueueEmpty() {
        return this.eventQueue.isEmpty();
    }

    public void scanForSpeculations() {
        LOG.info("We got asked to run a debug speculation scan.");
        System.out.println("We got asked to run a debug speculation scan.");
        System.out.println("There are " + this.scanControl.size() + " events stacked already.");
        this.scanControl.add(new Object());
        Thread.yield();
    }

    private AtomicInteger containerNeed(TaskId taskId) {
        JobId jobId = taskId.getJobId();
        ConcurrentMap<JobId, AtomicInteger> concurrentMap = taskId.getTaskType() == TaskType.MAP ? this.mapContainerNeeds : this.reduceContainerNeeds;
        AtomicInteger atomicInteger = concurrentMap.get(jobId);
        if (atomicInteger == null) {
            concurrentMap.putIfAbsent(jobId, new AtomicInteger(0));
            atomicInteger = concurrentMap.get(jobId);
        }
        return atomicInteger;
    }

    private synchronized void processSpeculatorEvent(SpeculatorEvent speculatorEvent) {
        switch (speculatorEvent.getType()) {
            case ATTEMPT_STATUS_UPDATE:
                statusUpdate(speculatorEvent.getReportedStatus(), speculatorEvent.getTimestamp());
                return;
            case TASK_CONTAINER_NEED_UPDATE:
                containerNeed(speculatorEvent.getTaskID()).addAndGet(speculatorEvent.containersNeededChange());
                return;
            case ATTEMPT_START:
                LOG.info("ATTEMPT_START " + speculatorEvent.getTaskID());
                this.estimator.enrollAttempt(speculatorEvent.getReportedStatus(), speculatorEvent.getTimestamp());
                return;
            case JOB_CREATE:
                LOG.info("JOB_CREATE " + speculatorEvent.getJobID());
                this.estimator.contextualize(getConfig(), this.context);
                return;
            default:
                return;
        }
    }

    protected void statusUpdate(TaskAttemptStatusUpdateEvent.TaskAttemptStatus taskAttemptStatus, long j) {
        String taskAttemptState = taskAttemptStatus.taskState.toString();
        TaskAttemptId taskAttemptId = taskAttemptStatus.id;
        TaskId taskId = taskAttemptId.getTaskId();
        Job job = this.context.getJob(taskId.getJobId());
        if (job == null || job.getTask(taskId) == null) {
            return;
        }
        this.estimator.updateAttempt(taskAttemptStatus, j);
        if (taskAttemptState.equals(TaskAttemptState.RUNNING.name())) {
            this.runningTasks.putIfAbsent(taskId, Boolean.TRUE);
            return;
        }
        this.runningTasks.remove(taskId, Boolean.TRUE);
        if (taskAttemptState.equals(TaskAttemptState.STARTING.name())) {
            return;
        }
        this.runningTaskAttemptStatistics.remove(taskAttemptId);
    }

    private long speculationValue(TaskId taskId, long j) {
        Map<TaskAttemptId, TaskAttempt> attempts = this.context.getJob(taskId.getJobId()).getTask(taskId).getAttempts();
        long j2 = Long.MIN_VALUE;
        long j3 = Long.MIN_VALUE;
        if (!this.mayHaveSpeculated.contains(taskId)) {
            j2 = this.estimator.thresholdRuntime(taskId);
            if (j2 == Long.MAX_VALUE) {
                return Long.MIN_VALUE;
            }
        }
        int i = 0;
        for (TaskAttempt taskAttempt : attempts.values()) {
            if (taskAttempt.getState() == TaskAttemptState.RUNNING || taskAttempt.getState() == TaskAttemptState.STARTING) {
                i++;
                if (i > 1) {
                    return -9223372036854775807L;
                }
                TaskAttemptId id = taskAttempt.getID();
                long estimatedRuntime = this.estimator.estimatedRuntime(id);
                long attemptEnrolledTime = this.estimator.attemptEnrolledTime(id);
                if (attemptEnrolledTime > j) {
                    return TOO_NEW;
                }
                long j4 = estimatedRuntime + attemptEnrolledTime;
                long estimatedNewAttemptRuntime = j + this.estimator.estimatedNewAttemptRuntime(taskId);
                float progress = taskAttempt.getProgress();
                TaskAttemptHistoryStatistics taskAttemptHistoryStatistics = this.runningTaskAttemptStatistics.get(id);
                if (taskAttemptHistoryStatistics == null) {
                    this.runningTaskAttemptStatistics.put(id, new TaskAttemptHistoryStatistics(estimatedRuntime, progress, j));
                } else if (estimatedRuntime != taskAttemptHistoryStatistics.getEstimatedRunTime() || progress != taskAttemptHistoryStatistics.getProgress()) {
                    taskAttemptHistoryStatistics.setEstimatedRunTime(estimatedRuntime);
                    taskAttemptHistoryStatistics.setProgress(progress);
                    taskAttemptHistoryStatistics.resetHeartBeatTime(j);
                } else if (taskAttemptHistoryStatistics.notHeartbeatedInAWhile(j)) {
                    TaskAttemptStatusUpdateEvent.TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
                    taskAttemptStatus.id = id;
                    taskAttemptStatus.progress = progress;
                    taskAttemptStatus.taskState = taskAttempt.getState();
                    handleAttempt(taskAttemptStatus);
                }
                if (j4 < j) {
                    return PROGRESS_IS_GOOD;
                }
                if (estimatedNewAttemptRuntime >= j4) {
                    return TOO_LATE_TO_SPECULATE;
                }
                j3 = j4 - estimatedNewAttemptRuntime;
            }
        }
        if (i == 0) {
            return NOT_RUNNING;
        }
        if (j2 == Long.MIN_VALUE && this.estimator.thresholdRuntime(taskId) == Long.MAX_VALUE) {
            return Long.MIN_VALUE;
        }
        return j3;
    }

    protected void addSpeculativeAttempt(TaskId taskId) {
        LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskId);
        this.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
        this.mayHaveSpeculated.add(taskId);
    }

    @Override // org.apache.hadoop.yarn.event.EventHandler
    public void handle(SpeculatorEvent speculatorEvent) {
        processSpeculatorEvent(speculatorEvent);
    }

    private int maybeScheduleAMapSpeculation() {
        return maybeScheduleASpeculation(TaskType.MAP);
    }

    private int maybeScheduleAReduceSpeculation() {
        return maybeScheduleASpeculation(TaskType.REDUCE);
    }

    private int maybeScheduleASpeculation(TaskType taskType) {
        int i = 0;
        long time = this.clock.getTime();
        for (Map.Entry<JobId, AtomicInteger> entry : (taskType == TaskType.MAP ? this.mapContainerNeeds : this.reduceContainerNeeds).entrySet()) {
            if (entry.getValue().get() <= 0) {
                int i2 = 0;
                int i3 = 0;
                Map<TaskId, Task> tasks = this.context.getJob(entry.getKey()).getTasks(taskType);
                int max = (int) Math.max(this.minimumAllowedSpeculativeTasks, this.proportionTotalTasksSpeculatable * tasks.size());
                TaskId taskId = null;
                long j = -1;
                for (Map.Entry<TaskId, Task> entry2 : tasks.entrySet()) {
                    long speculationValue = speculationValue(entry2.getKey(), time);
                    if (speculationValue == -9223372036854775807L) {
                        i2++;
                    }
                    if (speculationValue != NOT_RUNNING) {
                        i3++;
                    }
                    if (speculationValue > j) {
                        taskId = entry2.getKey();
                        j = speculationValue;
                    }
                }
                int max2 = (int) Math.max(max, this.proportionRunningTasksSpeculatable * i3);
                if (taskId != null && max2 > i2) {
                    addSpeculativeAttempt(taskId);
                    i++;
                }
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int computeSpeculations() {
        return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
    }

    @VisibleForTesting
    public long getSoonestRetryAfterNoSpeculate() {
        return this.soonestRetryAfterNoSpeculate;
    }

    @VisibleForTesting
    public long getSoonestRetryAfterSpeculate() {
        return this.soonestRetryAfterSpeculate;
    }

    @VisibleForTesting
    public double getProportionRunningTasksSpeculatable() {
        return this.proportionRunningTasksSpeculatable;
    }

    @VisibleForTesting
    public double getProportionTotalTasksSpeculatable() {
        return this.proportionTotalTasksSpeculatable;
    }

    @VisibleForTesting
    public int getMinimumAllowedSpeculativeTasks() {
        return this.minimumAllowedSpeculativeTasks;
    }
}
