/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.MockTezClient;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.app.dag.speculation.legacy.LegacyTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.SimpleExponentialTaskRuntimeEstimator;
import org.apache.tez.dag.app.dag.speculation.legacy.TaskRuntimeEstimator;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestSpeculation {
    private static final Logger LOG = LoggerFactory.getLogger(TezConfiguration.class);
    private static final String ASSERT_SPECULATIONS_COUNT_MSG = "Number of attempts after Speculation should be two";
    private static final String UNIT_EXCEPTION_MESSAGE = "test timed out after";
    private static final int NUM_UPDATES_FOR_TEST_TASK = 1200;
    private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3;
    private Configuration defaultConf;
    private FileSystem localFs;
    MockDAGAppMaster mockApp;
    MockDAGAppMaster.MockContainerLauncher mockLauncher;
    @Rule
    public RetryRule rule = new RetryRule(3);
    private Class<? extends TaskRuntimeEstimator> estimatorClass;

    @Before
    public void setDefaultConf() {
        try {
            this.defaultConf = new Configuration(false);
            this.defaultConf.set("fs.defaultFS", "file:///");
            this.defaultConf.setBoolean("tez.local.mode", true);
            this.defaultConf.setBoolean("tez.am.speculation.enabled", true);
            this.defaultConf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", 1.0f);
            this.defaultConf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", 1.0f);
            this.localFs = FileSystem.getLocal((Configuration)this.defaultConf);
            String stagingDir = "target/" + TestSpeculation.class.getName() + "-tmpDir";
            this.defaultConf.set("tez.staging-dir", stagingDir);
            this.defaultConf.setClass("tez.am.task.estimator.class", this.estimatorClass, TaskRuntimeEstimator.class);
            this.defaultConf.setInt("tez.am.minimum.allowed.speculative.tasks", 20);
            this.defaultConf.setDouble("tez.am.proportion.total.tasks.speculatable", 0.2);
            this.defaultConf.setDouble("tez.am.proportion.running.tasks.speculatable", 0.25);
            this.defaultConf.setLong("tez.am.soonest.retry.after.no.speculate", 25L);
            this.defaultConf.setLong("tez.am.soonest.retry.after.speculate", 50L);
            this.defaultConf.setInt("tez.am.task.estimator.exponential.skip.initials", 2);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }

    @After
    public void tearDown() {
        this.defaultConf = null;
        try {
            this.localFs.close();
            this.mockLauncher.shutdown();
            this.mockApp.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Parameterized.Parameters(name="{index}: TaskEstimator(EstimatorClass {0})")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList({SimpleExponentialTaskRuntimeEstimator.class}, {LegacyTaskRuntimeEstimator.class});
    }

    public TestSpeculation(Class<? extends TaskRuntimeEstimator> estimatorKlass) {
        this.estimatorClass = estimatorKlass;
    }

    MockTezClient createTezSession() throws Exception {
        TezConfiguration tezconf = new TezConfiguration(this.defaultConf);
        AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
        MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null, new MockClock(), mockAppLauncherGoFlag, false, false, 1, 2);
        tezClient.start();
        this.syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
        return tezClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, MockTezClient tezClient) throws Exception {
        AtomicBoolean atomicBoolean = mockAppLauncherGoFlag;
        synchronized (atomicBoolean) {
            while (!mockAppLauncherGoFlag.get()) {
                mockAppLauncherGoFlag.wait();
            }
            this.mockApp = tezClient.getLocalClient().getMockApp();
            this.mockLauncher = this.mockApp.getContainerLauncher();
            this.mockLauncher.startScheduling(allowScheduling);
            mockAppLauncherGoFlag.notify();
        }
    }

    @Retry
    @Test(timeout=30000L)
    public void testSingleTaskSpeculation() throws Exception {
        HashMap<Long, Integer> confToExpected = new HashMap<Long, Integer>();
        confToExpected.put(0x3FFFFFFFFFFFFFFFL, 1);
        confToExpected.put(100L, 2);
        confToExpected.put(-1L, 1);
        this.defaultConf.setLong("tez.am.soonest.retry.after.no.speculate", 50L);
        for (Map.Entry entry : confToExpected.entrySet()) {
            this.defaultConf.setLong("tez.am.legacy.speculative.single.task.vertex.timeout", ((Long)entry.getKey()).longValue());
            DAG dag = DAG.create((String)"test");
            org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)1);
            dag.addVertex(vA);
            MockTezClient tezClient = this.createTezSession();
            DAGClient dagClient = tezClient.submitDAG(dag);
            DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
            TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
            TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
            TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)1);
            Thread.sleep(200L);
            this.mockLauncher.setStatusUpdatesForTask(killedTaId, 1200);
            this.mockLauncher.startScheduling(true);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
            TaskImpl task = dagImpl.getTask(killedTaId.getTaskID());
            Assert.assertEquals((long)((Integer)entry.getValue()).intValue(), (long)task.getAttempts().size());
            if ((Integer)entry.getValue() > 1) {
                Assert.assertEquals((Object)successTaId, (Object)task.getSuccessfulAttempt().getTaskAttemptID());
                TaskAttempt killedAttempt = task.getAttempt(killedTaId);
                Joiner.on((String)",").join((Iterable)killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
                Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, (Object)killedAttempt.getTerminationCause());
            }
            tezClient.stop();
        }
    }

    public void testBasicSpeculation(boolean withProgress) throws Exception {
        DAG dag = DAG.create((String)"test");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)1);
        this.mockLauncher.updateProgress(withProgress);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 1200);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        TaskImpl task = dagImpl.getTask(killedTaId.getTaskID());
        Assert.assertEquals((String)ASSERT_SPECULATIONS_COUNT_MSG, (long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((Object)successTaId, (Object)task.getSuccessfulAttempt().getTaskAttemptID());
        TaskAttempt killedAttempt = task.getAttempt(killedTaId);
        Joiner.on((String)",").join((Iterable)killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, (Object)killedAttempt.getTerminationCause());
        if (withProgress) {
            Assert.assertEquals((long)1L, (long)task.getCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
            Assert.assertEquals((long)1L, (long)dagImpl.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
            Vertex v = dagImpl.getVertex(killedTaId.getVertexID());
            Assert.assertEquals((long)1L, (long)v.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        }
        LegacySpeculator speculator = (LegacySpeculator)dagImpl.getVertex(vA.getName()).getSpeculator();
        Assert.assertEquals((long)20L, (long)speculator.getMinimumAllowedSpeculativeTasks());
        Assert.assertEquals((double)0.2, (double)speculator.getProportionTotalTasksSpeculatable(), (double)0.0);
        Assert.assertEquals((double)0.25, (double)speculator.getProportionRunningTasksSpeculatable(), (double)0.0);
        Assert.assertEquals((long)25L, (long)speculator.getSoonestRetryAfterNoSpeculate());
        Assert.assertEquals((long)50L, (long)speculator.getSoonestRetryAfterSpeculate());
        tezClient.stop();
    }

    @Retry
    @Test(timeout=30000L)
    public void testBasicSpeculationWithProgress() throws Exception {
        this.testBasicSpeculation(true);
    }

    @Retry
    @Test(timeout=30000L)
    public void testBasicSpeculationWithoutProgress() throws Exception {
        this.testBasicSpeculation(false);
    }

    @Retry
    @Test(timeout=30000L)
    public void testBasicSpeculationPerVertexConf() throws Exception {
        DAG dag = DAG.create((String)"test");
        String vNameNoSpec = "A";
        String vNameSpec = "B";
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)vNameNoSpec, (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        org.apache.tez.dag.api.Vertex vB = org.apache.tez.dag.api.Vertex.create((String)vNameSpec, (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        vA.setConf("tez.am.speculation.enabled", "false");
        dag.addVertex(vA);
        dag.addVertex(vB);
        dag.addEdge(Edge.create((org.apache.tez.dag.api.Vertex)vA, (org.apache.tez.dag.api.Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"O"), (InputDescriptor)InputDescriptor.create((String)"I"))));
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexIdSpec = dagImpl.getVertex(vNameSpec).getVertexId();
        TezVertexID vertexIdNoSpec = dagImpl.getVertex(vNameNoSpec).getVertexId();
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexIdSpec, (int)0), (int)0);
        TezTaskAttemptID successfulTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexIdNoSpec, (int)0), (int)0);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 1200);
        this.mockLauncher.setStatusUpdatesForTask(successfulTaId, 1200);
        this.mockLauncher.startScheduling(true);
        Vertex vSpec = dagImpl.getVertex(vertexIdSpec);
        Vertex vNoSpec = dagImpl.getVertex(vertexIdNoSpec);
        do {
            Thread.sleep(100L);
        } while (vSpec.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue() <= 0L);
        dagClient.waitForCompletion();
        Assert.assertTrue((String)"Num Speculations is not higher than 0", (vSpec.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)vNoSpec.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        tezClient.stop();
    }

    @Retry
    @Test(timeout=30000L)
    public void testBasicSpeculationNotUseful() throws Exception {
        DAG dag = DAG.create((String)"test");
        org.apache.tez.dag.api.Vertex vA = org.apache.tez.dag.api.Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)5);
        dag.addVertex(vA);
        MockTezClient tezClient = this.createTezSession();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGImpl dagImpl = (DAGImpl)this.mockApp.getContext().getCurrentDAG();
        TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagImpl.getID(), (int)0);
        TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)0);
        TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexId, (int)0), (int)1);
        this.mockLauncher.setStatusUpdatesForTask(successTaId, 1200);
        this.mockLauncher.setStatusUpdatesForTask(killedTaId, 1200);
        this.mockLauncher.startScheduling(true);
        dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        TaskImpl task = dagImpl.getTask(killedTaId.getTaskID());
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((Object)successTaId, (Object)task.getSuccessfulAttempt().getTaskAttemptID());
        TaskAttempt killedAttempt = task.getAttempt(killedTaId);
        Joiner.on((String)",").join((Iterable)killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, (Object)killedAttempt.getTerminationCause());
        Assert.assertEquals((long)1L, (long)task.getCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        Assert.assertEquals((long)1L, (long)dagImpl.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        Vertex v = dagImpl.getVertex(killedTaId.getVertexID());
        Assert.assertEquals((long)1L, (long)v.getAllCounters().findCounter((Enum)TaskCounter.NUM_SPECULATIONS).getValue());
        tezClient.stop();
    }

    class RetryRule
    implements TestRule {
        private AtomicInteger retryCount;

        RetryRule(int retries) {
            this.retryCount = new AtomicInteger(retries);
        }

        public Statement apply(final Statement base, final Description description) {
            return new Statement(){

                public void evaluate() throws Throwable {
                    Throwable caughtThrowable = null;
                    while (RetryRule.this.retryCount.getAndDecrement() > 0) {
                        try {
                            base.evaluate();
                            return;
                        }
                        catch (Throwable t) {
                            caughtThrowable = t;
                            if (RetryRule.this.retryCount.get() > 0 && description.getAnnotation(Retry.class) != null) {
                                if (!(t instanceof AssertionError && t.getMessage().contains(TestSpeculation.ASSERT_SPECULATIONS_COUNT_MSG) || t instanceof Exception && t.getMessage().contains(TestSpeculation.UNIT_EXCEPTION_MESSAGE))) {
                                    throw caughtThrowable;
                                }
                                LOG.warn("{} : Failed. Retries remaining: ", (Object)description.getDisplayName(), (Object)RetryRule.this.retryCount.toString());
                                continue;
                            }
                            throw caughtThrowable;
                        }
                    }
                }
            };
        }
    }

    @Retention(value=RetentionPolicy.RUNTIME)
    public static @interface Retry {
    }
}

