/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.processor.reduce;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.input.LocalMergedInput;
import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestReduceProcessor {
    private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
    private static JobConf defaultConf = new JobConf();
    private static FileSystem localFs = null;
    private static Path workDir = null;

    public void setUpJobConf(JobConf job) {
        job.set("tez.runtime.local.dirs", workDir.toString());
        job.set("mapreduce.cluster.local.dir", workDir.toString());
        job.setClass("tez.runtime.task.local.output.manager", TezLocalTaskOutputFiles.class, TezTaskOutput.class);
        job.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
        job.setNumReduceTasks(1);
    }

    @Before
    @After
    public void cleanup() throws Exception {
        localFs.delete(workDir, true);
    }

    @Test
    public void testReduceProcessor() throws Exception {
        String dagName = "mrdag0";
        String mapVertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
        String reduceVertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
        JobConf jobConf = new JobConf((Configuration)defaultConf);
        this.setUpJobConf(jobConf);
        Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez((Configuration)jobConf);
        conf.setInt("mapreduce.job.application.attempt.id", 0);
        Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex((Configuration)conf, (String)mapVertexName);
        JobConf mapConf = new JobConf(mapStageConf);
        mapConf.set("tez.runtime.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        mapConf.setBoolean("mapreduce.tez.splits.via.events", false);
        Path mapInput = new Path(workDir, "map0");
        MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
        InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()).setUserPayload(MRHelpers.createMRInputPayload((Configuration)mapConf, null)), 0);
        OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
        LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0, mapInput, new TestUmbilical(), "mrdag0", mapVertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec));
        mapTask.initialize();
        mapTask.run();
        mapTask.close();
        LOG.info((Object)"Starting reduce...");
        Token shuffleToken = new Token();
        Configuration reduceStageConf = MultiStageMRConfigUtil.getConfForVertex((Configuration)conf, (String)reduceVertexName);
        JobConf reduceConf = new JobConf(reduceStageConf);
        reduceConf.setOutputFormat(SequenceFileOutputFormat.class);
        reduceConf.set("tez.runtime.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        FileOutputFormat.setOutputPath((JobConf)reduceConf, (Path)new Path(workDir, "output"));
        ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)reduceConf));
        InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
        OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutputLegacy.class.getName()), 1);
        TaskSpec taskSpec = new TaskSpec(TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "mrdag0", reduceVertexName, reduceProcessorDesc, Collections.singletonList(reduceInputSpec), Collections.singletonList(reduceOutputSpec), null);
        HashMap<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
        serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ShuffleUtils.convertJobTokenToBytes((Token)shuffleToken));
        LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(taskSpec, 0, (Configuration)reduceConf, (TezUmbilical)new TestUmbilical(), serviceConsumerMetadata, (Multimap)HashMultimap.create());
        task.initialize();
        task.run();
        task.close();
        Path reduceOutputDir = new Path(new Path(workDir, "output"), "_temporary/0/" + IDConverter.toMRTaskIdForOutput((TezTaskID)TezTestUtils.getMockTaskId(0, 1, 0)));
        Path reduceOutputFile = new Path(reduceOutputDir, "part-v001-o000-00000");
        SequenceFile.Reader reader = new SequenceFile.Reader(localFs, reduceOutputFile, (Configuration)reduceConf);
        LongWritable key = new LongWritable();
        Text value = new Text();
        long prev = Long.MIN_VALUE;
        while (reader.next((Writable)key, (Writable)value)) {
            if (prev == Long.MIN_VALUE) continue;
            Assert.assertTrue((prev < key.get() ? 1 : 0) != 0);
            prev = key.get();
        }
        reader.close();
    }

    static {
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestReduceProcessor").makeQualified(localFs);
            LOG.info((Object)("Using workDir: " + workDir));
            MapUtils.configureLocalDirs((Configuration)defaultConf, workDir.toString());
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}

