/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;

public class TestLocalRunner
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
    private static final int TOTAL_RECORDS = 3310000;
    private final String INPUT_DIR = "multiMapInput";
    private final String OUTPUT_DIR = "multiMapOutput";

    private void createInputFile(Path dirPath, int id, int numRecords) throws IOException {
        String MESSAGE = "This is a line in a file: ";
        Path filePath = new Path(dirPath, "" + id);
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FSDataOutputStream os = fs.create(filePath);
        BufferedWriter w = new BufferedWriter(new OutputStreamWriter((OutputStream)os));
        for (int i = 0; i < numRecords; ++i) {
            w.write("This is a line in a file: " + id + " " + i + "\n");
        }
        w.close();
    }

    private Path getInputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapInput");
        }
        return new Path(new Path(dataDir), "multiMapInput");
    }

    private Path getOutputPath() {
        String dataDir = System.getProperty("test.build.data");
        if (null == dataDir) {
            return new Path("multiMapOutput");
        }
        return new Path(new Path(dataDir), "multiMapOutput");
    }

    private Path createMultiMapsInput() throws IOException {
        Path inputPath;
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(inputPath = this.getInputPath())) {
            fs.delete(inputPath, true);
        }
        this.createInputFile(inputPath, 0, 50000);
        this.createInputFile(inputPath, 1, 500);
        this.createInputFile(inputPath, 2, 500);
        this.createInputFile(inputPath, 3, 20);
        this.createInputFile(inputPath, 4, 5000);
        this.createInputFile(inputPath, 5, 500);
        return inputPath;
    }

    private void verifyOutput(Path outputPath) throws IOException {
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        Path outputFile = new Path(outputPath, "part-r-00000");
        FSDataInputStream is = fs.open(outputFile);
        BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)is));
        String line = r.readLine().trim();
        TestLocalRunner.assertTrue((String)"Line does not have correct key", (boolean)line.startsWith("0\t"));
        int count = Integer.valueOf(line.substring(2));
        TestLocalRunner.assertEquals((String)"Incorrect count generated!", (int)3310000, (int)count);
        r.close();
    }

    @Test
    public void testMultiMaps() throws Exception {
        Job job = new Job();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps((JobContext)job, (int)6);
        job.getConfiguration().set("io.sort.record.pct", "0.50");
        job.getConfiguration().set("io.sort.mb", "25");
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        job.waitForCompletion(true);
        this.verifyOutput(outputPath);
    }

    @Test
    public void testInvalidMultiMapParallelism() throws Exception {
        Job job = new Job();
        Path inputPath = this.createMultiMapsInput();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps((JobContext)job, (int)-6);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        boolean success = job.waitForCompletion(true);
        TestLocalRunner.assertFalse((String)"Job succeeded somehow", (boolean)success);
    }

    public void testEmptyMaps() throws Exception {
        Job job = new Job();
        Path outputPath = this.getOutputPath();
        Configuration conf = new Configuration();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        job.setInputFormatClass(EmptyInputFormat.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        boolean success = job.waitForCompletion(true);
        TestLocalRunner.assertTrue((String)"Empty job should work", (boolean)success);
    }

    private static class EmptyRecordReader
    extends RecordReader<Object, Object> {
        private EmptyRecordReader() {
        }

        public void initialize(InputSplit split, TaskAttemptContext context) {
        }

        public Object getCurrentKey() {
            return new Object();
        }

        public Object getCurrentValue() {
            return new Object();
        }

        public float getProgress() {
            return 0.0f;
        }

        public void close() {
        }

        public boolean nextKeyValue() {
            return false;
        }
    }

    private static class EmptyInputFormat
    extends InputFormat<Object, Object> {
        private EmptyInputFormat() {
        }

        public List<InputSplit> getSplits(JobContext context) {
            return new ArrayList<InputSplit>();
        }

        public RecordReader<Object, Object> createRecordReader(InputSplit split, TaskAttemptContext context) {
            return new EmptyRecordReader();
        }
    }

    private static class CountingReducer
    extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
        private CountingReducer() {
        }

        public void reduce(LongWritable key, Iterable<Text> vals, Reducer.Context context) throws IOException, InterruptedException {
            long out = 0L;
            for (Text val : vals) {
                ++out;
            }
            context.write((Object)key, (Object)new LongWritable(out));
        }
    }

    private static class StressMapper
    extends Mapper<LongWritable, Text, LongWritable, Text> {
        private int threadId;
        public long exposedState;

        private StressMapper() {
        }

        protected void setup(Mapper.Context context) {
            FileSplit split = (FileSplit)context.getInputSplit();
            Path filePath = split.getPath();
            String name = filePath.getName();
            this.threadId = Integer.valueOf(name);
            LOG.info((Object)("Thread " + this.threadId + " : " + context.getInputSplit()));
        }

        public void map(LongWritable key, Text val, Mapper.Context c) throws IOException, InterruptedException {
            switch (this.threadId) {
                case 0: {
                    c.write((Object)new LongWritable(0L), (Object)val);
                    break;
                }
                case 1: 
                case 2: {
                    for (int i = 0; i < 500; ++i) {
                        c.write((Object)new LongWritable(0L), (Object)val);
                    }
                    break;
                }
                case 3: {
                    for (int i = 0; i < 50; ++i) {
                        for (int j = 0; j < 10; ++j) {
                            c.write((Object)new LongWritable(0L), (Object)val);
                        }
                        Thread.sleep(1L);
                    }
                    break;
                }
                case 4: {
                    for (int i = 0; i < 500; ++i) {
                        for (int j = 0; j < 10000; ++j) {
                            ++this.exposedState;
                        }
                        c.write((Object)new LongWritable(0L), (Object)val);
                    }
                    break;
                }
                case 5: {
                    for (int i = 0; i < 500; ++i) {
                        for (int j = 0; j < 100000; ++j) {
                            ++this.exposedState;
                        }
                        c.write((Object)new LongWritable(0L), (Object)val);
                    }
                    break;
                }
                default: {
                    c.write((Object)new LongWritable(0L), (Object)val);
                }
            }
        }

        protected void cleanup(Mapper.Context context) {
            LOG.debug((Object)("Busy loop counter: " + this.exposedState));
        }
    }
}

