package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner.class
 */
/* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner.class */
public class TestLocalRunner extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
    private static int[] INPUT_SIZES = {50000, 500, 500, 20, 5000, 500};
    private static int[] OUTPUT_SIZES = {1, 500, 500, 500, 500, 500};
    private static int[] SLEEP_INTERVALS = {10000, 15, 15, 20, 250, 60};
    private static int TOTAL_RECORDS = 0;
    private final String INPUT_DIR = "multiMapInput";
    private final String OUTPUT_DIR = "multiMapOutput";
    private static final int NUMBER_FILE_VAL = 100;

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$CountingReducer.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$CountingReducer.class */
    private static class CountingReducer extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
        private CountingReducer() {
        }

        public void reduce(LongWritable longWritable, Iterable<Text> iterable, Reducer<LongWritable, Text, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
            long j = 0;
            for (Text text : iterable) {
                j++;
            }
            context.write(longWritable, new LongWritable(j));
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((LongWritable) obj, (Iterable<Text>) iterable, (Reducer<LongWritable, Text, LongWritable, LongWritable>.Context) context);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$EmptyInputFormat.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$EmptyInputFormat.class */
    private static class EmptyInputFormat extends InputFormat<Object, Object> {
        private EmptyInputFormat() {
        }

        public List<InputSplit> getSplits(JobContext jobContext) {
            return new ArrayList();
        }

        public RecordReader<Object, Object> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
            return new EmptyRecordReader();
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$EmptyRecordReader.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$EmptyRecordReader.class */
    private static class EmptyRecordReader extends RecordReader<Object, Object> {
        private EmptyRecordReader() {
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        }

        public Object getCurrentKey() {
            return new Object();
        }

        public Object getCurrentValue() {
            return new Object();
        }

        public float getProgress() {
            return 0.0f;
        }

        public void close() {
        }

        public boolean nextKeyValue() {
            return false;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$GCMapper.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$GCMapper.class */
    private static class GCMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        private GCMapper() {
        }

        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 20000; i++) {
                arrayList.add(new Integer(i));
            }
            int i2 = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                i2 += ((Integer) it.next()).intValue();
            }
            System.gc();
            context.write(new LongWritable(i2), text);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, LongWritable, Text>.Context) context);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$SequenceMapper.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$SequenceMapper.class */
    public static class SequenceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            int intValue = Integer.valueOf(text.toString()).intValue();
            for (int i = 0; i < intValue; i++) {
                context.write(new Text("" + i), NullWritable.get());
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, Text, NullWritable>.Context) context);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-jobclient-2.5.1-mapr-1503-tests.jar:org/apache/hadoop/mapreduce/TestLocalRunner$StressMapper.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/TestLocalRunner$StressMapper.class */
    private static class StressMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        private int threadId;
        public long exposedState;

        private StressMapper() {
        }

        protected void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
            this.threadId = Integer.valueOf(context.getInputSplit().getPath().getName()).intValue();
            TestLocalRunner.LOG.info("Thread " + this.threadId + " : " + context.getInputSplit());
        }

        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            for (int i = 0; i < TestLocalRunner.OUTPUT_SIZES[this.threadId]; i++) {
                context.write(new LongWritable(0L), text);
                if (i % TestLocalRunner.SLEEP_INTERVALS[this.threadId] == 1) {
                    Thread.sleep(1L);
                }
            }
        }

        protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
            TestLocalRunner.LOG.debug("Busy loop counter: " + this.exposedState);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, LongWritable, Text>.Context) context);
        }
    }

    private void createInputFile(Path path, int i, int i2) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(FileSystem.getLocal(new Configuration()).create(new Path(path, "" + i))));
        for (int i3 = 0; i3 < i2; i3++) {
            bufferedWriter.write("This is a line in a file: " + i + " " + i3 + "\n");
        }
        bufferedWriter.close();
    }

    private Path getInputPath() {
        String property = System.getProperty("test.build.data");
        return null == property ? new Path("multiMapInput") : new Path(new Path(property), "multiMapInput");
    }

    private Path getOutputPath() {
        String property = System.getProperty("test.build.data");
        return null == property ? new Path("multiMapOutput") : new Path(new Path(property), "multiMapOutput");
    }

    private Path createMultiMapsInput() throws IOException {
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        Path inputPath = getInputPath();
        if (local.exists(inputPath)) {
            local.delete(inputPath, true);
        }
        for (int i = 0; i < 6; i++) {
            createInputFile(inputPath, i, INPUT_SIZES[i]);
        }
        return inputPath;
    }

    private void verifyOutput(Path path) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(FileSystem.getLocal(new Configuration()).open(new Path(path, "part-r-00000"))));
        String trim = bufferedReader.readLine().trim();
        assertTrue("Line does not have correct key", trim.startsWith("0\t"));
        assertEquals("Incorrect count generated!", TOTAL_RECORDS, Integer.valueOf(trim.substring(2)).intValue());
        bufferedReader.close();
    }

    @Test
    public void testGcCounter() throws Exception {
        Path inputPath = getInputPath();
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        if (local.exists(outputPath)) {
            local.delete(outputPath, true);
        }
        if (local.exists(inputPath)) {
            local.delete(inputPath, true);
        }
        createInputFile(inputPath, 0, 20);
        Job job = Job.getInstance();
        job.setMapperClass(GCMapper.class);
        job.setNumReduceTasks(0);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        assertTrue("job failed", job.waitForCompletion(true));
        Counter findCounter = job.getCounters().findCounter(TaskCounter.GC_TIME_MILLIS);
        assertNotNull(findCounter);
        assertTrue("No time spent in gc", findCounter.getValue() > 0);
    }

    @Test(timeout = 120000)
    public void testMultiMaps() throws Exception {
        Job job = Job.getInstance();
        Path createMultiMapsInput = createMultiMapsInput();
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        if (local.exists(outputPath)) {
            local.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps(job, 6);
        job.getConfiguration().set("mapreduce.task.io.sort.mb", "25");
        FileInputFormat.addInputPath(job, createMultiMapsInput);
        FileOutputFormat.setOutputPath(job, outputPath);
        final Thread currentThread = Thread.currentThread();
        Thread thread = new Thread() { // from class: org.apache.hadoop.mapreduce.TestLocalRunner.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(120000L);
                    currentThread.interrupt();
                } catch (InterruptedException e) {
                }
            }
        };
        LOG.info("Submitting job...");
        job.submit();
        LOG.info("Starting thread to interrupt main thread in 2 minutes");
        thread.start();
        LOG.info("Waiting for job to complete...");
        try {
            job.waitForCompletion(true);
            LOG.info("Job completed, stopping interrupter");
            thread.interrupt();
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
            LOG.info("Verifying output");
            verifyOutput(outputPath);
        } catch (InterruptedException e2) {
            LOG.fatal("Interrupted while waiting for job completion", e2);
            for (int i = 0; i < 10; i++) {
                LOG.fatal("Dumping stacks");
                ReflectionUtils.logThreadInfo(LOG, "multimap threads", 0L);
                Thread.sleep(1000L);
            }
            throw e2;
        }
    }

    @Test
    public void testInvalidMultiMapParallelism() throws Exception {
        Job job = Job.getInstance();
        Path createMultiMapsInput = createMultiMapsInput();
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        if (local.exists(outputPath)) {
            local.delete(outputPath, true);
        }
        job.setMapperClass(StressMapper.class);
        job.setReducerClass(CountingReducer.class);
        job.setNumReduceTasks(1);
        LocalJobRunner.setLocalMaxRunningMaps(job, -6);
        FileInputFormat.addInputPath(job, createMultiMapsInput);
        FileOutputFormat.setOutputPath(job, outputPath);
        assertFalse("Job succeeded somehow", job.waitForCompletion(true));
    }

    @Test
    public void testEmptyMaps() throws Exception {
        Job job = Job.getInstance();
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        if (local.exists(outputPath)) {
            local.delete(outputPath, true);
        }
        job.setInputFormatClass(EmptyInputFormat.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath(job, outputPath);
        assertTrue("Empty job should work", job.waitForCompletion(true));
    }

    private Path getNumberDirPath() {
        return new Path(getInputPath(), "numberfiles");
    }

    private Path makeNumberFile(int i, int i2) throws IOException {
        Path path = new Path(getNumberDirPath(), "file" + i);
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(FileSystem.getLocal(new Configuration()).create(path)));
        bufferedWriter.write("" + i2);
        bufferedWriter.close();
        return path;
    }

    private void verifyNumberJob(int i) throws Exception {
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        int i2 = 0;
        for (FileStatus fileStatus : local.listStatus(outputPath)) {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(local.open(fileStatus.getPath())));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine != null) {
                    i2 += Integer.valueOf(readLine.trim()).intValue();
                }
            }
            bufferedReader.close();
        }
        int i3 = ((99 * (99 + 1)) / 2) * i;
        LOG.info("expected sum: " + i3 + ", got " + i2);
        assertEquals("Didn't get all our results back", i3, i2);
    }

    private void doMultiReducerTest(int i, int i2, int i3, int i4) throws Exception {
        Path numberDirPath = getNumberDirPath();
        Path outputPath = getOutputPath();
        LocalFileSystem local = FileSystem.getLocal(new Configuration());
        if (local.exists(outputPath)) {
            local.delete(outputPath, true);
        }
        if (local.exists(numberDirPath)) {
            local.delete(numberDirPath, true);
        }
        for (int i5 = 0; i5 < i; i5++) {
            makeNumberFile(i5, NUMBER_FILE_VAL);
        }
        Job job = Job.getInstance();
        job.setNumReduceTasks(i2);
        job.setMapperClass(SequenceMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, numberDirPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        LocalJobRunner.setLocalMaxRunningMaps(job, i3);
        LocalJobRunner.setLocalMaxRunningReduces(job, i4);
        assertTrue("Job failed!!", job.waitForCompletion(true));
        verifyNumberJob(i);
    }

    @Test
    public void testOneMapMultiReduce() throws Exception {
        doMultiReducerTest(1, 2, 1, 1);
    }

    @Test
    public void testOneMapMultiParallelReduce() throws Exception {
        doMultiReducerTest(1, 2, 1, 2);
    }

    @Test
    public void testMultiMapOneReduce() throws Exception {
        doMultiReducerTest(4, 1, 2, 1);
    }

    @Test
    public void testMultiMapMultiReduce() throws Exception {
        doMultiReducerTest(4, 4, 2, 2);
    }

    static {
        for (int i = 0; i < 6; i++) {
            TOTAL_RECORDS += INPUT_SIZES[i] * OUTPUT_SIZES[i];
        }
    }
}
