/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestProcessor
extends AbstractLogicalIOProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
    Configuration conf;
    boolean doFail = false;
    boolean doRandomFail = false;
    float randomFailProbability = 0.0f;
    long sleepMs;
    Set<Integer> failingTaskIndices = Sets.newHashSet();
    int failingTaskAttemptUpto = 0;
    Integer failAll = new Integer(-1);
    int verifyValue = -1;
    Set<Integer> verifyTaskIndices = Sets.newHashSet();
    public static String TEZ_FAILING_PROCESSOR_DO_FAIL = "tez.failing-processor.do-fail";
    public static String TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL = "tez.failing-processor.do-random-fail";
    public static String TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY = "tez.failing-processor.random-fail-probability";
    public static String TEZ_FAILING_PROCESSOR_SLEEP_MS = "tez.failing-processor.sleep-ms";
    public static String TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX = "tez.failing-processor.failing-task-index";
    public static String TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT = "tez.failing-processor.failing-upto-task-attempt";
    public static String TEZ_FAILING_PROCESSOR_VERIFY_VALUE = "tez.failing-processor.verify-value";
    public static String TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX = "tez.failing-processor.verify-task-index";

    public TestProcessor(ProcessorContext context) {
        super(context);
    }

    public static ProcessorDescriptor getProcDesc(UserPayload payload) {
        return (ProcessorDescriptor)ProcessorDescriptor.create((String)TestProcessor.class.getName()).setUserPayload(payload == null ? UserPayload.create(null) : payload);
    }

    void throwException(String msg) {
        RuntimeException e = new RuntimeException(msg);
        this.getContext().reportFailure(TaskFailureType.NON_FATAL, (Throwable)e, msg);
        throw e;
    }

    public static String getVertexConfName(String confName, String vertexName) {
        return confName + "." + vertexName;
    }

    public static String getVertexConfName(String confName, String vertexName, int taskIndex) {
        return confName + "." + vertexName + "." + String.valueOf(taskIndex);
    }

    public void initialize() throws Exception {
        if (this.getContext().getUserPayload() != null && this.getContext().getUserPayload().hasPayload()) {
            String vName = this.getContext().getTaskVertexName();
            this.conf = TezUtils.createConfFromUserPayload((UserPayload)this.getContext().getUserPayload());
            this.verifyValue = this.conf.getInt(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_VALUE, vName, this.getContext().getTaskIndex()), -1);
            if (this.verifyValue != -1) {
                LOG.info("Verify value: " + this.verifyValue);
                for (String verifyIndex : this.conf.getTrimmedStringCollection(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, vName))) {
                    LOG.info("Adding verify task index: " + verifyIndex);
                    this.verifyTaskIndices.add(Integer.valueOf(verifyIndex));
                }
            }
            this.doFail = this.conf.getBoolean(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_DO_FAIL, vName), false);
            this.sleepMs = this.conf.getLong(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_SLEEP_MS, vName), 0L);
            LOG.info("doFail: " + this.doFail);
            if (this.doFail) {
                for (String failingIndex : this.conf.getTrimmedStringCollection(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
                    LOG.info("Adding failing task index: " + failingIndex);
                    this.failingTaskIndices.add(Integer.valueOf(failingIndex));
                }
                this.failingTaskAttemptUpto = this.conf.getInt(TestProcessor.getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
                LOG.info("Adding failing attempt : " + this.failingTaskAttemptUpto + " dag: " + this.getContext().getDAGName());
            }
            this.doRandomFail = this.conf.getBoolean(TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, false);
            this.randomFailProbability = this.conf.getFloat(TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.0f);
            LOG.info("doRandomFail: " + this.doRandomFail);
            LOG.info("randomFailProbability: " + this.randomFailProbability);
        }
    }

    public void handleEvents(List<Event> processorEvents) {
    }

    public void close() throws Exception {
    }

    public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
        Object msg;
        LOG.info("Sleeping ms: " + this.sleepMs);
        for (LogicalInput input : inputs.values()) {
            input.start();
        }
        for (LogicalOutput output : outputs.values()) {
            output.start();
        }
        Thread.sleep(this.sleepMs);
        if (!this.doRandomFail) {
            if (this.doFail && (this.failingTaskIndices.contains(this.failAll) || this.failingTaskIndices.contains(this.getContext().getTaskIndex())) && (this.failingTaskAttemptUpto == this.failAll || this.failingTaskAttemptUpto >= this.getContext().getTaskAttemptNumber())) {
                String msg2 = "FailingProcessor: " + this.getContext().getUniqueIdentifier() + " dag: " + this.getContext().getDAGName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + this.getContext().getTaskAttemptNumber();
                LOG.info(msg2);
                this.throwException(msg2);
            }
        } else {
            int maxFailedAttempt;
            int taskAttemptNumber = this.getContext().getTaskAttemptNumber();
            if (taskAttemptNumber < (maxFailedAttempt = this.conf.getInt("tez.am.task.max.failed.attempts", 4)) - 1) {
                float rollNumber = (float)Math.random();
                Object msg3 = "FailingProcessor random fail turned on. Do a roll: " + this.getContext().getUniqueIdentifier() + " dag: " + this.getContext().getDAGName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + taskAttemptNumber + " maxFailedAttempt: " + maxFailedAttempt + " rollNumber: " + rollNumber + " randomFailProbability " + this.randomFailProbability;
                LOG.info((String)msg3);
                if (rollNumber < this.randomFailProbability) {
                    msg3 = "FailingProcessor: rollNumber < randomFailProbability. Do fail.";
                    LOG.info((String)msg3);
                    this.throwException((String)msg3);
                }
            }
        }
        if (inputs.entrySet().size() > 0) {
            String msg4 = "Reading input of current FailingProcessor: " + this.getContext().getUniqueIdentifier() + " dag: " + this.getContext().getDAGName() + " vertex: " + this.getContext().getTaskVertexName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + this.getContext().getTaskAttemptNumber();
            LOG.info(msg4);
        }
        int sum = this.getContext().getTaskAttemptNumber() + 1;
        LOG.info("initializing vertex= " + this.getContext().getTaskVertexName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + this.getContext().getTaskAttemptNumber() + " sum= " + sum);
        for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) {
            if (!(entry.getValue() instanceof TestInput)) {
                LOG.info("Ignoring non TestInput: " + entry.getKey() + " inputClass= " + entry.getValue().getClass().getSimpleName());
                continue;
            }
            TestInput input = (TestInput)entry.getValue();
            int inputValue = input.doRead();
            LOG.info("Reading input: " + entry.getKey() + " inputValue= " + inputValue);
            sum += inputValue;
        }
        if (outputs.entrySet().size() > 0) {
            msg = "Writing output of current FailingProcessor: " + this.getContext().getUniqueIdentifier() + " dag: " + this.getContext().getDAGName() + " vertex: " + this.getContext().getTaskVertexName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + this.getContext().getTaskAttemptNumber();
            LOG.info((String)msg);
        }
        for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
            if (!(entry.getValue() instanceof TestOutput)) {
                LOG.info("Ignoring non TestOutput: " + entry.getKey() + " outputClass= " + entry.getValue().getClass().getSimpleName());
                continue;
            }
            LOG.info("Writing output: " + entry.getKey() + " sum= " + sum);
            TestOutput output = (TestOutput)entry.getValue();
            output.write(sum);
        }
        LOG.info("Output for DAG: " + this.getContext().getDAGName() + " vertex: " + this.getContext().getTaskVertexName() + " task: " + this.getContext().getTaskIndex() + " attempt: " + this.getContext().getTaskAttemptNumber() + " is: " + sum);
        if (this.verifyTaskIndices.contains(new Integer(this.getContext().getTaskIndex()))) {
            if (this.verifyValue != -1 && this.verifyValue != sum) {
                msg = "Expected output mismatch of current FailingProcessor: " + this.getContext().getUniqueIdentifier() + " dag: " + this.getContext().getDAGName() + " vertex: " + this.getContext().getTaskVertexName() + " taskIndex: " + this.getContext().getTaskIndex() + " taskAttempt: " + this.getContext().getTaskAttemptNumber();
                msg = (String)msg + "\nExpected output: " + this.verifyValue + " got: " + sum;
                this.throwException((String)msg);
            } else {
                LOG.info("Verified output for DAG: " + this.getContext().getDAGName() + " vertex: " + this.getContext().getTaskVertexName() + " task: " + this.getContext().getTaskIndex() + " attempt: " + this.getContext().getTaskAttemptNumber() + " is: " + sum);
            }
        }
    }
}

