/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.ScalingAllocator;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestLogicalIOProcessorRuntimeTask {
    @Test
    public void testAutoStart() throws Exception {
        TezDAGID dagId = this.createTezDagId();
        TezVertexID vertexId = this.createTezVertexId(dagId);
        HashMap serviceConsumerMetadata = new HashMap();
        HashMultimap startedInputsMap = HashMultimap.create();
        TezUmbilical umbilical = (TezUmbilical)Mockito.mock(TezUmbilical.class);
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.set("tez.task.scale.memory.allocator.class", ScalingAllocator.class.getName());
        TezTaskAttemptID taId1 = this.createTaskAttemptID(vertexId, 1);
        TaskSpec task1 = this.createTaskSpec(taId1, "dag1", "vertex1", 30);
        TezTaskAttemptID taId2 = this.createTaskAttemptID(vertexId, 2);
        TaskSpec task2 = this.createTaskSpec(taId2, "dag2", "vertex1", 10);
        LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, (Multimap)startedInputsMap, null);
        lio1.initialize();
        lio1.run();
        lio1.close();
        Assert.assertEquals((long)1L, (long)TestProcessor.runCount);
        Assert.assertEquals((long)1L, (long)TestInput.startCount);
        Assert.assertEquals((long)0L, (long)TestOutput.startCount);
        Assert.assertEquals((long)30L, (long)TestInput.vertexParallelism);
        Assert.assertEquals((long)0L, (long)TestOutput.vertexParallelism);
        Assert.assertEquals((long)30L, (long)lio1.getProcessorContext().getVertexParallelism());
        Assert.assertEquals((long)30L, (long)((InputContext)lio1.getInputContexts().iterator().next()).getVertexParallelism());
        Assert.assertEquals((long)30L, (long)((OutputContext)lio1.getOutputContexts().iterator().next()).getVertexParallelism());
        LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, (Configuration)tezConf, null, umbilical, serviceConsumerMetadata, (Multimap)startedInputsMap, null);
        lio2.initialize();
        lio2.run();
        lio2.close();
        Assert.assertEquals((long)2L, (long)TestProcessor.runCount);
        Assert.assertEquals((long)1L, (long)TestInput.startCount);
        Assert.assertEquals((long)0L, (long)TestOutput.startCount);
        Assert.assertEquals((long)30L, (long)TestInput.vertexParallelism);
        Assert.assertEquals((long)0L, (long)TestOutput.vertexParallelism);
        Assert.assertEquals((long)10L, (long)lio2.getProcessorContext().getVertexParallelism());
        Assert.assertEquals((long)10L, (long)((InputContext)lio2.getInputContexts().iterator().next()).getVertexParallelism());
        Assert.assertEquals((long)10L, (long)((OutputContext)lio2.getOutputContexts().iterator().next()).getVertexParallelism());
    }

    private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, String dagName, String vertexName, int parallelism) {
        ProcessorDescriptor processorDesc = this.createProcessorDescriptor();
        TaskSpec taskSpec = new TaskSpec(taskAttemptID, dagName, vertexName, parallelism, processorDesc, this.createInputSpecList(), this.createOutputSpecList(), null);
        return taskSpec;
    }

    private List<InputSpec> createInputSpecList() {
        InputDescriptor inputDesc = InputDescriptor.create((String)TestInput.class.getName());
        InputSpec inputSpec = new InputSpec("inedge", inputDesc, 1);
        return Lists.newArrayList((Object[])new InputSpec[]{inputSpec});
    }

    private List<OutputSpec> createOutputSpecList() {
        OutputDescriptor outputtDesc = OutputDescriptor.create((String)TestOutput.class.getName());
        OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
        return Lists.newArrayList((Object[])new OutputSpec[]{outputSpec});
    }

    private ProcessorDescriptor createProcessorDescriptor() {
        ProcessorDescriptor desc = ProcessorDescriptor.create((String)TestProcessor.class.getName());
        return desc;
    }

    private TezTaskAttemptID createTaskAttemptID(TezVertexID vertexId, int taskIndex) {
        TezTaskID taskId = TezTaskID.getInstance((TezVertexID)vertexId, (int)taskIndex);
        TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)taskIndex);
        return taskAttemptId;
    }

    private TezVertexID createTezVertexId(TezDAGID dagId) {
        return TezVertexID.getInstance((TezDAGID)dagId, (int)1);
    }

    private TezDAGID createTezDagId() {
        return TezDAGID.getInstance((String)"2000", (int)100, (int)1);
    }

    public static class TestOutput
    extends AbstractLogicalOutput {
        public static volatile int startCount = 0;
        public static volatile int vertexParallelism;

        public TestOutput(OutputContext outputContext, int numPhysicalOutputs) {
            super(outputContext, numPhysicalOutputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            return null;
        }

        public void start() throws Exception {
            System.err.println("Out started");
            ++startCount;
            vertexParallelism = this.getContext().getVertexParallelism();
        }

        public Writer getWriter() throws Exception {
            return null;
        }

        public void handleEvents(List<Event> outputEvents) {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class TestInput
    extends AbstractLogicalInput {
        public static volatile int startCount = 0;
        public static volatile int vertexParallelism;

        public TestInput(InputContext inputContext, int numPhysicalInputs) {
            super(inputContext, numPhysicalInputs);
        }

        public List<Event> initialize() throws Exception {
            this.getContext().requestInitialMemory(0L, null);
            this.getContext().inputIsReady();
            return null;
        }

        public void start() throws Exception {
            ++startCount;
            vertexParallelism = this.getContext().getVertexParallelism();
            System.err.println("In started");
        }

        public Reader getReader() throws Exception {
            return null;
        }

        public void handleEvents(List<Event> inputEvents) throws Exception {
        }

        public List<Event> close() throws Exception {
            return null;
        }
    }

    public static class TestProcessor
    extends AbstractLogicalIOProcessor {
        public static volatile int runCount = 0;

        public TestProcessor(ProcessorContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            ++runCount;
        }

        public void handleEvents(List<Event> processorEvents) {
        }

        public void close() throws Exception {
        }
    }
}

