/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test.dag;

import com.google.common.primitives.Ints;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;

public class MultiAttemptDAG {
    private static final Log LOG = LogFactory.getLog(MultiAttemptDAG.class);
    static Resource defaultResource = Resource.newInstance((int)100, (int)0);
    public static String MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS = "tez.multi-attempt-dag.vertex.num-tasks";
    public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
    public static String MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER = "tez.multi-attempt-dag.use-failing-committer";
    public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;

    public static DAG createDAG(String name, Configuration conf) throws Exception {
        byte[] payload = null;
        int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
        if (conf != null) {
            taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        DAG dag = new DAG(name);
        Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
        Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
        Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
        v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(new String("1").getBytes()));
        v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(new String("2").getBytes()));
        v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(FailOnAttemptVertexManagerPlugin.class.getName()).setUserPayload(new String("3").getBytes()));
        dag.addVertex(v1).addVertex(v2).addVertex(v3);
        dag.addEdge(new Edge(v1, v2, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
        dag.addEdge(new Edge(v2, v3, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
        return dag;
    }

    public static DAG createDAG(Configuration conf) throws Exception {
        return MultiAttemptDAG.createDAG("SimpleVTestDAG", conf);
    }

    public static class NoOpOutput
    implements LogicalOutput,
    MemoryUpdateCallback {
        public void setNumPhysicalOutputs(int numOutputs) {
        }

        public List<Event> initialize(TezOutputContext outputContext) throws Exception {
            outputContext.requestInitialMemory(1L, (MemoryUpdateCallback)this);
            return null;
        }

        public void start() throws Exception {
        }

        public Writer getWriter() throws Exception {
            return null;
        }

        public void handleEvents(List<Event> outputEvents) {
        }

        public List<Event> close() throws Exception {
            return null;
        }

        public void memoryAssigned(long assignedSize) {
        }
    }

    public static class FailingOutputCommitter
    extends OutputCommitter {
        boolean failOnCommit = false;

        public void initialize(OutputCommitterContext context) throws Exception {
            FailingOutputCommitterConfig config = new FailingOutputCommitterConfig();
            config.fromUserPayload(context.getUserPayload());
            this.failOnCommit = config.failOnCommit;
        }

        public void setupOutput() throws Exception {
        }

        public void commitOutput() throws Exception {
            if (this.failOnCommit) {
                LOG.info((Object)"Committer causing AM to shutdown");
                Runtime.getRuntime().halt(-1);
            }
        }

        public void abortOutput(VertexStatus.State finalState) throws Exception {
        }

        public static class FailingOutputCommitterConfig {
            boolean failOnCommit;

            public FailingOutputCommitterConfig() {
                this(false);
            }

            public FailingOutputCommitterConfig(boolean failOnCommit) {
                this.failOnCommit = failOnCommit;
            }

            public byte[] toUserPayload() {
                return Ints.toByteArray((int)(this.failOnCommit ? 1 : 0));
            }

            public void fromUserPayload(byte[] userPayload) {
                int failInt = Ints.fromByteArray((byte[])userPayload);
                this.failOnCommit = failInt != 0;
            }
        }
    }

    public static class FailOnAttemptVertexManagerPlugin
    implements VertexManagerPlugin {
        private int numSourceTasks = 0;
        private AtomicInteger numCompletions = new AtomicInteger();
        private VertexManagerPluginContext context;
        private boolean tasksScheduled = false;

        public void initialize(VertexManagerPluginContext context) {
            this.context = context;
            for (String input : context.getInputVertexEdgeProperties().keySet()) {
                LOG.info((Object)("Adding sourceTasks for Vertex " + input));
                this.numSourceTasks += context.getVertexNumTasks(input);
                LOG.info((Object)("Current numSourceTasks=" + this.numSourceTasks));
            }
        }

        public void onVertexStarted(Map<String, List<Integer>> completions) {
            if (completions != null) {
                for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
                    LOG.info((Object)("Received completion events on vertexStarted, vertex=" + entry.getKey() + ", completions=" + entry.getValue().size()));
                    this.numCompletions.addAndGet(entry.getValue().size());
                }
            }
            this.maybeScheduleTasks();
        }

        private synchronized void maybeScheduleTasks() {
            if (this.numCompletions.get() >= this.numSourceTasks && !this.tasksScheduled) {
                this.tasksScheduled = true;
                String payload = new String(this.context.getUserPayload());
                int successAttemptId = Integer.valueOf(payload);
                LOG.info((Object)("Checking whether to crash AM or schedule tasks, successfulAttemptID=" + successAttemptId + ", currentAttempt=" + this.context.getDAGAttemptNumber()));
                if (successAttemptId > this.context.getDAGAttemptNumber()) {
                    Runtime.getRuntime().halt(-1);
                } else if (successAttemptId == this.context.getDAGAttemptNumber()) {
                    LOG.info((Object)("Scheduling tasks for vertex=" + this.context.getVertexName()));
                    int numTasks = this.context.getVertexNumTasks(this.context.getVertexName());
                    ArrayList<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
                    for (int i = 0; i < numTasks; ++i) {
                        scheduledTasks.add(new Integer(i));
                    }
                    this.context.scheduleVertexTasks(scheduledTasks);
                }
            }
        }

        public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
            LOG.info((Object)("Received completion events for source task, vertex=" + srcVertexName + ", taskIdx=" + taskId));
            this.numCompletions.incrementAndGet();
            this.maybeScheduleTasks();
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
        }
    }
}

