/*
 * Decompiled with CFR 0.152.
 */
package org.apache.whirr.service.hama.integration;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.RunningJob;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.examples.PiEstimator;
import org.apache.whirr.service.hama.integration.HamaServiceController;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HamaServiceTest {
    private static final Logger LOG = LoggerFactory.getLogger(HamaServiceTest.class);
    private static HamaServiceController controller = HamaServiceController.getInstance();
    private static final Path TMP_OUTPUT = new Path("/pi-" + System.currentTimeMillis());

    @BeforeClass
    public static void setUp() throws Exception {
        controller.ensureClusterRunning();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        controller.shutdown();
    }

    @Test(timeout=900000L)
    public void test() throws Exception {
        HamaConfiguration jobConf = controller.getConfiguration();
        jobConf.set("hadoop.rpc.socket.factory.class.default", "org.apache.hadoop.net.StandardSocketFactory");
        BSPJob bsp = new BSPJob(jobConf, new BSPJobID());
        LOG.info("Job conf: " + bsp.getConf().get("hadoop.rpc.socket.factory.class.default") + ", " + bsp.getJobID().toString());
        bsp.setJarByClass(PiEstimator.MyEstimator.class);
        bsp.setBspClass(PiEstimator.MyEstimator.class);
        bsp.setInputFormat(NullInputFormat.class);
        bsp.setOutputKeyClass(Text.class);
        bsp.setOutputValueClass(DoubleWritable.class);
        bsp.setOutputFormat(TextOutputFormat.class);
        bsp.set("bsp.working.dir", "/tmp");
        FileOutputFormat.setOutputPath((BSPJob)bsp, (Path)TMP_OUTPUT);
        LOG.info("Client configuration start ..");
        HamaConfiguration clientConf = controller.getConfiguration();
        BSPJobClient jobClient = new BSPJobClient((Configuration)clientConf);
        ClusterStatus cluster = jobClient.getClusterStatus(true);
        Assert.assertNotNull((Object)cluster);
        Assert.assertTrue((cluster.getGroomServers() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((cluster.getMaxTasks() > 1 ? 1 : 0) != 0);
        bsp.setNumBspTask(cluster.getMaxTasks());
        LOG.info("Client conf: " + clientConf.get("hadoop.rpc.socket.factory.class.default"));
        RunningJob rJob = jobClient.submitJob(bsp);
        rJob.waitForCompletion();
        LOG.info("finished");
    }
}

