/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.processor.map;

import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMapProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(TestMapProcessor.class);
    private static JobConf defaultConf = new JobConf();
    private static FileSystem localFs = null;
    private static Path workDir = null;
    static float progressUpdate = 0.0f;

    public void setUpJobConf(JobConf job) {
        job.set("tez.runtime.framework.local.dirs", workDir.toString());
        job.set("mapreduce.cluster.local.dir", workDir.toString());
        job.setClass("tez.runtime.task.local.output.manager", TezTaskOutputFiles.class, TezTaskOutput.class);
        job.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
        job.setNumReduceTasks(1);
    }

    private Path getMapOutputFile(Configuration jobConf, OutputContext outputContext) throws IOException {
        LocalDirAllocator lDirAlloc = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        Path attemptOutput = new Path(new Path("output", outputContext.getUniqueIdentifier()), "file.out");
        Path mapOutputFile = lDirAlloc.getLocalPathToRead(attemptOutput.toString(), jobConf);
        return mapOutputFile;
    }

    @Before
    @After
    public void cleanup() throws Exception {
        localFs.delete(workDir, true);
    }

    @Test(timeout=5000L)
    public void testMapProcessor() throws Exception {
        String dagName = "mrdag0";
        String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
        JobConf jobConf = new JobConf((Configuration)defaultConf);
        this.setUpJobConf(jobConf);
        MRHelpers.translateMRConfToTez((Configuration)jobConf);
        jobConf.setInt("mapreduce.job.application.attempt.id", 0);
        jobConf.setBoolean("mapreduce.tez.splits.via.events", false);
        jobConf.set("tez.mr.framework.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        Path mapInput = new Path(workDir, "map0");
        MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
        InputSpec mapInputSpec = new InputSpec("NullSrcVertex", (InputDescriptor)InputDescriptor.create((String)MRInputLegacy.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder().setConfigurationBytes(TezUtils.createByteStringFromConf((Configuration)jobConf)).build().toByteArray()))), 1);
        OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", (OutputDescriptor)OutputDescriptor.create((String)OrderedPartitionedKVOutput.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)jobConf)), 1);
        TezSharedExecutor sharedExecutor = new TezSharedExecutor((Configuration)jobConf);
        LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec), sharedExecutor);
        task.initialize();
        task.run();
        task.close();
        sharedExecutor.shutdownNow();
        OutputContext outputContext = (OutputContext)task.getOutputContexts().iterator().next();
        TezTaskOutputFiles mapOutputs = new TezTaskOutputFiles((Configuration)jobConf, outputContext.getUniqueIdentifier(), outputContext.getDagIdentifier());
        Path mapOutputFile = this.getMapOutputFile((Configuration)jobConf, outputContext);
        LOG.info("mapOutputFile = " + mapOutputFile);
        IFile.Reader reader = new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
        LongWritable key = new LongWritable();
        Text value = new Text();
        DataInputBuffer keyBuf = new DataInputBuffer();
        DataInputBuffer valueBuf = new DataInputBuffer();
        long prev = Long.MIN_VALUE;
        while (reader.nextRawKey(keyBuf)) {
            reader.nextRawValue(valueBuf);
            key.readFields((DataInput)keyBuf);
            value.readFields((DataInput)valueBuf);
            if (prev != Long.MIN_VALUE) {
                assert (prev <= key.get());
                prev = key.get();
            }
            LOG.info("key = " + key.get() + "; value = " + value);
        }
        reader.close();
    }

    @Test(timeout=30000L)
    public void testMapProcessorProgress() throws Exception {
        String dagName = "mrdag0";
        String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
        JobConf jobConf = new JobConf((Configuration)defaultConf);
        this.setUpJobConf(jobConf);
        MRHelpers.translateMRConfToTez((Configuration)jobConf);
        jobConf.setInt("mapreduce.job.application.attempt.id", 0);
        jobConf.setBoolean("mapreduce.tez.splits.via.events", false);
        jobConf.set("tez.mr.framework.task-local-resource.dir", new Path(workDir, "localized-resources").toUri().toString());
        Path mapInput = new Path(workDir, "map0");
        MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);
        InputSpec mapInputSpec = new InputSpec("NullSrcVertex", (InputDescriptor)InputDescriptor.create((String)MRInputLegacy.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder().setConfigurationBytes(TezUtils.createByteStringFromConf((Configuration)jobConf)).build().toByteArray()))), 1);
        OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", (OutputDescriptor)OutputDescriptor.create((String)OrderedPartitionedKVOutput.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)jobConf)), 1);
        TezSharedExecutor sharedExecutor = new TezSharedExecutor((Configuration)jobConf);
        final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec), sharedExecutor);
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        Thread monitorProgress = new Thread(new Runnable(){

            @Override
            public void run() {
                float prog = task.getProgress();
                if (prog > 0.0f && prog < 1.0f) {
                    progressUpdate = prog;
                }
            }
        });
        task.initialize();
        scheduler.scheduleAtFixedRate(monitorProgress, 0L, 1L, TimeUnit.MILLISECONDS);
        task.run();
        Assert.assertTrue((String)"Progress Updates should be captured!", (progressUpdate > 0.0f && progressUpdate < 1.0f ? 1 : 0) != 0);
        task.close();
        sharedExecutor.shutdownNow();
    }

    static {
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestMapProcessor").makeQualified(localFs);
            LOG.info("Using workDir: " + workDir);
            MapUtils.configureLocalDirs((Configuration)defaultConf, workDir.toString());
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}

