/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Arrays;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.PiEstimator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniHDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.security.UserGroupInformation;

public class TestMiniMRWithDFS
extends TestCase {
    private static final Log LOG = LogFactory.getLog((String)TestMiniMRWithDFS.class.getName());
    static final int NUM_MAPS = 10;
    static final int NUM_SAMPLES = 100000;

    public static TestResult launchWordCount(JobConf conf, Path inDir, Path outDir, String input, int numMaps, int numReduces) throws IOException {
        FileSystem inFs = inDir.getFileSystem((Configuration)conf);
        FileSystem outFs = outDir.getFileSystem((Configuration)conf);
        outFs.delete(outDir, true);
        if (!inFs.mkdirs(inDir)) {
            throw new IOException("Mkdirs failed to create " + inDir.toString());
        }
        FSDataOutputStream file = inFs.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        file.close();
        conf.setJobName("wordcount");
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(WordCount.MapClass.class);
        conf.setCombinerClass(WordCount.Reduce.class);
        conf.setReducerClass(WordCount.Reduce.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{inDir});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outDir);
        conf.setNumMapTasks(numMaps);
        conf.setNumReduceTasks(numReduces);
        RunningJob job = JobClient.runJob((JobConf)conf);
        return new TestResult(job, MapReduceTestUtil.readOutput(outDir, (Configuration)conf));
    }

    static void checkTaskDirectories(MiniMRCluster mr, String user, String[] jobIds, String[] taskDirs) {
        mr.waitUntilIdle();
        int trackers = mr.getNumTaskTrackers();
        ArrayList<String> observedJobDirs = new ArrayList<String>();
        ArrayList<String> observedFilesInsideJobDir = new ArrayList<String>();
        for (int i = 0; i < trackers; ++i) {
            File userDir;
            File localDir = new File(mr.getTaskTrackerLocalDir(i));
            TestMiniMRWithDFS.assertTrue((String)("Local dir " + localDir + " does not exist."), (boolean)localDir.isDirectory());
            LOG.info((Object)("Verifying contents of mapred.local.dir " + localDir.getAbsolutePath()));
            File trackerSubDir = new File(localDir, "taskTracker");
            if (!trackerSubDir.isDirectory() || !(userDir = new File(trackerSubDir, user)).isDirectory()) continue;
            LOG.info((Object)("Verifying contents of user-dir " + userDir.getAbsolutePath()));
            TestMiniMRWithDFS.verifyContents(new String[]{"jobcache", "distcache"}, userDir.list());
            File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir((String)user));
            String[] jobDirs = jobCacheDir.list();
            observedJobDirs.addAll(Arrays.asList(jobDirs));
            for (String jobDir : jobDirs) {
                String[] attemptDirs = new File(jobCacheDir, jobDir).list();
                observedFilesInsideJobDir.addAll(Arrays.asList(attemptDirs));
            }
        }
        LOG.info((Object)"Verifying the list of job directories");
        TestMiniMRWithDFS.verifyContents(jobIds, observedJobDirs.toArray(new String[observedJobDirs.size()]));
        LOG.info((Object)"Verifying the list of task directories");
        for (int j = 0; j < taskDirs.length; ++j) {
            TestMiniMRWithDFS.assertTrue((String)("Expected task-directory " + taskDirs[j] + " is not present!"), (boolean)observedFilesInsideJobDir.contains(taskDirs[j]));
        }
    }

    private static void verifyContents(String[] expectedFiles, String[] observedFiles) {
        int j;
        boolean[] foundExpectedFiles = new boolean[expectedFiles.length];
        boolean[] validObservedFiles = new boolean[observedFiles.length];
        for (j = 0; j < observedFiles.length; ++j) {
            for (int k = 0; k < expectedFiles.length; ++k) {
                if (!expectedFiles[k].equals(observedFiles[j])) continue;
                foundExpectedFiles[k] = true;
                validObservedFiles[j] = true;
            }
        }
        for (j = 0; j < foundExpectedFiles.length; ++j) {
            TestMiniMRWithDFS.assertTrue((String)("Expected file " + expectedFiles[j] + " not found"), (boolean)foundExpectedFiles[j]);
        }
        for (j = 0; j < validObservedFiles.length; ++j) {
            TestMiniMRWithDFS.assertTrue((String)("Unexpected file " + observedFiles[j] + " found"), (boolean)validObservedFiles[j]);
        }
    }

    public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
        LOG.info((Object)"runPI");
        double estimate = PiEstimator.estimate((int)10, (long)100000L, (JobConf)jobconf).doubleValue();
        double error = Math.abs(Math.PI - estimate);
        TestMiniMRWithDFS.assertTrue((String)("Error in PI estimation " + error + " exceeds 0.01"), (error < 0.01 ? 1 : 0) != 0);
        String userName = UserGroupInformation.getLoginUser().getUserName();
        TestMiniMRWithDFS.checkTaskDirectories(mr, userName, new String[0], new String[0]);
    }

    public static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
        LOG.info((Object)"runWordCount");
        String pattern = TaskAttemptID.getTaskAttemptIDsPattern(null, null, (Boolean)true, (Integer)1, null);
        jobConf.setKeepTaskFilesPattern(pattern);
        Path inDir = new Path("./wc/input");
        Path outDir = new Path("./wc/output");
        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
        TestResult result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 3, 1);
        TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
        JobID jobid = result.job.getID();
        TaskAttemptID taskid = new TaskAttemptID(new TaskID((org.apache.hadoop.mapreduce.JobID)jobid, true, 1), 0);
        String userName = UserGroupInformation.getLoginUser().getUserName();
        TestMiniMRWithDFS.checkTaskDirectories(mr, userName, new String[]{jobid.toString()}, new String[]{taskid.toString()});
        jobConf = mr.createJobConf();
        input = "owen is oom";
        result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 0, 1);
        TestMiniMRWithDFS.assertEquals((String)"is\t1\noom\t1\nowen\t1\n", (String)result.output);
        Counters counters = result.job.getCounters();
        long hdfsRead = counters.findCounter("FileSystemCounters", Task.getFileSystemCounterNames((String)"hdfs")[0]).getCounter();
        long hdfsWrite = counters.findCounter("FileSystemCounters", Task.getFileSystemCounterNames((String)"hdfs")[1]).getCounter();
        long rawSplitBytesRead = ((Counters.Counter)counters.findCounter((Enum)Task.Counter.SPLIT_RAW_BYTES)).getCounter();
        TestMiniMRWithDFS.assertEquals((long)result.output.length(), (long)hdfsWrite);
        TestMiniMRWithDFS.assertEquals((long)((long)input.length() + rawSplitBytesRead), (long)hdfsRead);
        LocalFileSystem localfs = FileSystem.getLocal((Configuration)jobConf);
        String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toString().replace(' ', '+');
        Path localIn = localfs.makeQualified(new Path(TEST_ROOT_DIR + "/local/in"));
        Path localOut = localfs.makeQualified(new Path(TEST_ROOT_DIR + "/local/out"));
        result = TestMiniMRWithDFS.launchWordCount(jobConf, localIn, localOut, "all your base belong to us", 1, 1);
        TestMiniMRWithDFS.assertEquals((String)"all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", (String)result.output);
        TestMiniMRWithDFS.assertTrue((String)"outputs on localfs", (boolean)localfs.exists(localOut));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWithDFS() throws IOException {
        MiniHDFSCluster dfs = null;
        MiniMRCluster mr = null;
        DistributedFileSystem fileSys = null;
        try {
            int taskTrackers = 4;
            Configuration conf = new Configuration();
            dfs = new MiniHDFSCluster(conf, 4, true, null);
            fileSys = dfs.getFileSystem();
            mr = new MiniMRCluster(4, fileSys.getUri().toString(), 1);
            mr.setInlineCleanupThreads();
            TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
            TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf());
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWithDFSWithDefaultPort() throws IOException {
        MiniHDFSCluster dfs = null;
        MiniMRCluster mr = null;
        DistributedFileSystem fileSys = null;
        try {
            int taskTrackers = 4;
            Configuration conf = new Configuration();
            dfs = new MiniHDFSCluster(8020, conf, 4, true, true, null, null);
            fileSys = dfs.getFileSystem();
            mr = new MiniMRCluster(4, fileSys.getUri().toString(), 1);
            JobConf jobConf = mr.createJobConf();
            Path inDir = new Path("./wc/input");
            Path outDir = new Path("hdfs://" + dfs.getNameNode().getNameNodeAddress().getHostName() + ":" + 8020 + "/./wc/output");
            String input = "The quick brown fox\nhas many silly\nred fox sox\n";
            TestResult result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 3, 1);
            TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
            Path outDir2 = new Path("hdfs:/test/wc/output2");
            jobConf.set("fs.default.name", "hdfs://localhost:8020");
            result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir2, input, 3, 1);
            TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
        }
        catch (BindException be) {
            LOG.info((Object)"Skip the test this time because can not start namenode on port 8020", (Throwable)be);
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    public static class TestResult {
        public String output;
        public RunningJob job;

        TestResult(RunningJob job, String output) {
            this.job = job;
            this.output = output;
        }
    }
}

