/*
 * Decompiled with CFR 0.152.
 */
package org.apache.whirr.examples;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Map;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapred.lib.TokenCountMapper;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterController;
import org.apache.whirr.ClusterControllerFactory;
import org.apache.whirr.ClusterSpec;
import org.apache.whirr.examples.Example;
import org.apache.whirr.service.hadoop.HadoopProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopClusterExample
extends Example {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopClusterExample.class);

    @Override
    public String getName() {
        return "hadoop-cluster";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int main(String[] args) throws Exception {
        if (!System.getenv().containsKey("AWS_ACCESS_KEY_ID")) {
            LOG.error("AWS_ACCESS_KEY_ID is undefined in the current environment");
            return -1;
        }
        if (!System.getenv().containsKey("AWS_SECRET_ACCESS_KEY")) {
            LOG.error("AWS_SECRET_ACCESS_KEY is undefined in the current environment");
            return -2;
        }
        ClusterSpec spec = new ClusterSpec((Configuration)new PropertiesConfiguration("whirr-hadoop-example.properties"));
        ClusterControllerFactory factory = new ClusterControllerFactory();
        ClusterController controller = factory.create(spec.getServiceName());
        HadoopProxy proxy = null;
        try {
            LOG.info("Starting cluster {}", (Object)spec.getClusterName());
            Cluster cluster = controller.launchCluster(spec);
            LOG.info("Starting local SOCKS proxy");
            proxy = new HadoopProxy(spec, cluster);
            proxy.start();
            org.apache.hadoop.conf.Configuration config = this.getHadoopConfiguration(cluster);
            JobConf job = new JobConf(config, HadoopClusterExample.class);
            JobClient client = new JobClient(job);
            this.waitToExitSafeMode(client);
            this.waitForTaskTrackers(client);
            this.runWordCountingJob(config);
        }
        finally {
            if (proxy != null) {
                proxy.stop();
            }
            controller.destroyCluster(spec);
            return 0;
        }
    }

    private void runWordCountingJob(org.apache.hadoop.conf.Configuration config) throws IOException {
        JobConf job = new JobConf(config, HadoopClusterExample.class);
        FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)config);
        FSDataOutputStream os = fs.create(new Path("input"));
        OutputStreamWriter wr = new OutputStreamWriter((OutputStream)os);
        wr.write("b a\n");
        ((Writer)wr).close();
        LOG.info("Wrote a file containing 'b a\\n'");
        job.setMapperClass(TokenCountMapper.class);
        job.setReducerClass(LongSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{new Path("input")});
        FileOutputFormat.setOutputPath((JobConf)job, (Path)new Path("output"));
        JobClient.runJob((JobConf)job);
        FSDataInputStream in = fs.open(new Path("output/part-00000"));
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)in));
        String line = reader.readLine();
        int count = 0;
        while (line != null) {
            LOG.info("Line {}: {}", (Object)count, (Object)line);
            ++count;
            line = reader.readLine();
        }
        reader.close();
    }

    private org.apache.hadoop.conf.Configuration getHadoopConfiguration(Cluster cluster) {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        for (Map.Entry<Object, Object> entry : cluster.getConfiguration().entrySet()) {
            conf.set(entry.getKey().toString(), entry.getValue().toString());
        }
        return conf;
    }

    private void waitToExitSafeMode(JobClient client) throws IOException {
        LOG.info("Waiting to exit safe mode...");
        FileSystem fs = client.getFs();
        DistributedFileSystem dfs = (DistributedFileSystem)fs;
        boolean inSafeMode = true;
        while (inSafeMode) {
            inSafeMode = dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        LOG.info("Exited safe mode");
    }

    private void waitForTaskTrackers(JobClient client) throws IOException {
        LOG.info("Waiting for tasktrackers...");
        while (true) {
            ClusterStatus clusterStatus;
            int taskTrackerCount;
            if ((taskTrackerCount = (clusterStatus = client.getClusterStatus()).getTaskTrackers()) > 0) {
                LOG.info("{} tasktrackers reported in. Continuing.", (Object)taskTrackerCount);
                break;
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                break;
            }
        }
    }
}

