package org.apache.whirr.examples;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Map;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapred.lib.TokenCountMapper;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterController;
import org.apache.whirr.ClusterControllerFactory;
import org.apache.whirr.ClusterSpec;
import org.apache.whirr.service.hadoop.HadoopProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/whirr/examples/HadoopClusterExample.class */
public class HadoopClusterExample extends Example {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopClusterExample.class);

    @Override // org.apache.whirr.examples.Example
    public String getName() {
        return "hadoop-cluster";
    }

    @Override // org.apache.whirr.examples.Example
    public int main(String[] strArr) throws Exception {
        if (!System.getenv().containsKey("AWS_ACCESS_KEY_ID")) {
            LOG.error("AWS_ACCESS_KEY_ID is undefined in the current environment");
            return -1;
        }
        if (!System.getenv().containsKey("AWS_SECRET_ACCESS_KEY")) {
            LOG.error("AWS_SECRET_ACCESS_KEY is undefined in the current environment");
            return -2;
        }
        ClusterSpec clusterSpec = new ClusterSpec(new PropertiesConfiguration("whirr-hadoop-example.properties"));
        ClusterController create = new ClusterControllerFactory().create(clusterSpec.getServiceName());
        HadoopProxy hadoopProxy = null;
        try {
            LOG.info("Starting cluster {}", clusterSpec.getClusterName());
            Cluster launchCluster = create.launchCluster(clusterSpec);
            LOG.info("Starting local SOCKS proxy");
            hadoopProxy = new HadoopProxy(clusterSpec, launchCluster);
            hadoopProxy.start();
            Configuration hadoopConfiguration = getHadoopConfiguration(launchCluster);
            JobClient jobClient = new JobClient(new JobConf(hadoopConfiguration, HadoopClusterExample.class));
            waitToExitSafeMode(jobClient);
            waitForTaskTrackers(jobClient);
            runWordCountingJob(hadoopConfiguration);
            if (hadoopProxy != null) {
                hadoopProxy.stop();
            }
            create.destroyCluster(clusterSpec);
            return 0;
        } catch (Throwable th) {
            if (hadoopProxy != null) {
                hadoopProxy.stop();
            }
            create.destroyCluster(clusterSpec);
            return 0;
        }
    }

    private void runWordCountingJob(Configuration configuration) throws IOException {
        JobConf jobConf = new JobConf(configuration, HadoopClusterExample.class);
        FileSystem fileSystem = FileSystem.get(configuration);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path("input")));
        outputStreamWriter.write("b a\n");
        outputStreamWriter.close();
        LOG.info("Wrote a file containing 'b a\\n'");
        jobConf.setMapperClass(TokenCountMapper.class);
        jobConf.setReducerClass(LongSumReducer.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths(jobConf, new Path[]{new Path("input")});
        FileOutputFormat.setOutputPath(jobConf, new Path("output"));
        JobClient.runJob(jobConf);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path("output/part-00000"))));
        int i = 0;
        for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
            LOG.info("Line {}: {}", Integer.valueOf(i), readLine);
            i++;
        }
        bufferedReader.close();
    }

    private Configuration getHadoopConfiguration(Cluster cluster) {
        Configuration configuration = new Configuration();
        for (Map.Entry entry : cluster.getConfiguration().entrySet()) {
            configuration.set(entry.getKey().toString(), entry.getValue().toString());
        }
        return configuration;
    }

    private void waitToExitSafeMode(JobClient jobClient) throws IOException {
        LOG.info("Waiting to exit safe mode...");
        DistributedFileSystem fs = jobClient.getFs();
        boolean z = true;
        while (z) {
            z = fs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        LOG.info("Exited safe mode");
    }

    private void waitForTaskTrackers(JobClient jobClient) throws IOException {
        LOG.info("Waiting for tasktrackers...");
        while (true) {
            int taskTrackers = jobClient.getClusterStatus().getTaskTrackers();
            if (taskTrackers > 0) {
                LOG.info("{} tasktrackers reported in. Continuing.", Integer.valueOf(taskTrackers));
                return;
            } else {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }
}
