/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Date;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.mortbay.jetty.HandlerContainer;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;

public abstract class NotificationTestCase
extends HadoopTestCase {
    private int port;
    private String contextPath = "/notification";
    private Class servletClass = NotificationServlet.class;
    private String servletPath = "/mapred";
    private Server webServer;

    private static void stdPrintln(String s) {
    }

    protected NotificationTestCase(int mode) throws IOException {
        super(mode, 4, 1, 1);
    }

    private void startHttpServer() throws Exception {
        if (this.webServer != null) {
            this.webServer.stop();
            this.webServer = null;
        }
        this.webServer = new Server(0);
        Context context = new Context((HandlerContainer)this.webServer, this.contextPath);
        context.addServlet(new ServletHolder((Servlet)new NotificationServlet()), this.servletPath);
        this.webServer.start();
        this.port = this.webServer.getConnectors()[0].getLocalPort();
    }

    private void stopHttpServer() throws Exception {
        if (this.webServer != null) {
            this.webServer.stop();
            this.webServer.destroy();
            this.webServer = null;
        }
    }

    private String getNotificationUrlTemplate() {
        return "http://localhost:" + this.port + this.contextPath + this.servletPath + "?jobId=$jobId&amp;jobStatus=$jobStatus";
    }

    @Override
    protected JobConf createJobConf() {
        JobConf conf = super.createJobConf();
        conf.setJobEndNotificationURI(this.getNotificationUrlTemplate());
        conf.setInt("job.end.retry.attempts", 3);
        conf.setInt("job.end.retry.interval", 200);
        return conf;
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.startHttpServer();
    }

    @Override
    protected void tearDown() throws Exception {
        this.stopHttpServer();
        super.tearDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testMR() throws Exception {
        System.out.println(this.launchWordCount(this.createJobConf(), "a b c d e f g h", 1, 1));
        Thread thread = Thread.currentThread();
        synchronized (thread) {
            NotificationTestCase.stdPrintln("Sleeping for 2 seconds to give time for retry");
            Thread.currentThread();
            Thread.sleep(2000L);
        }
        NotificationTestCase.assertEquals((int)2, (int)NotificationServlet.counter);
        Path inDir = new Path("notificationjob/input");
        Path outDir = new Path("notificationjob/output");
        if (this.isLocalFS()) {
            String localPathRoot = System.getProperty("test.build.data", "/tmp").toString().replace(' ', '+');
            inDir = new Path(localPathRoot, inDir);
            outDir = new Path(localPathRoot, outDir);
        }
        System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir, outDir).getID());
        Thread thread2 = Thread.currentThread();
        synchronized (thread2) {
            NotificationTestCase.stdPrintln("Sleeping for 2 seconds to give time for retry");
            Thread.currentThread();
            Thread.sleep(2000L);
        }
        NotificationTestCase.assertEquals((int)4, (int)NotificationServlet.counter);
        System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir, outDir).getID());
        thread2 = Thread.currentThread();
        synchronized (thread2) {
            NotificationTestCase.stdPrintln("Sleeping for 2 seconds to give time for retry");
            Thread.currentThread();
            Thread.sleep(2000L);
        }
        NotificationTestCase.assertEquals((int)6, (int)NotificationServlet.counter);
    }

    private String launchWordCount(JobConf conf, String input, int numMaps, int numReduces) throws IOException {
        Path inDir = new Path("testing/wc/input");
        Path outDir = new Path("testing/wc/output");
        if (this.isLocalFS()) {
            String localPathRoot = System.getProperty("test.build.data", "/tmp").toString().replace(' ', '+');
            inDir = new Path(localPathRoot, inDir);
            outDir = new Path(localPathRoot, outDir);
        }
        FileSystem fs = FileSystem.get((Configuration)conf);
        fs.delete(outDir, true);
        if (!fs.mkdirs(inDir)) {
            throw new IOException("Mkdirs failed to create " + inDir.toString());
        }
        FSDataOutputStream file = fs.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        file.close();
        conf.setJobName("wordcount");
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(WordCount.MapClass.class);
        conf.setCombinerClass(WordCount.Reduce.class);
        conf.setReducerClass(WordCount.Reduce.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{inDir});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outDir);
        conf.setNumMapTasks(numMaps);
        conf.setNumReduceTasks(numReduces);
        JobClient.runJob((JobConf)conf);
        return MapReduceTestUtil.readOutput(outDir, (Configuration)conf);
    }

    public static class NotificationServlet
    extends HttpServlet {
        public static int counter = 0;

        protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
            switch (counter) {
                case 0: {
                    Assert.assertTrue((boolean)req.getQueryString().contains("SUCCEEDED"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)req.getQueryString().contains("KILLED"));
                    break;
                }
                case 4: {
                    Assert.assertTrue((boolean)req.getQueryString().contains("FAILED"));
                }
            }
            if (counter % 2 == 0) {
                NotificationTestCase.stdPrintln(new Date().toString() + "Receiving First notification for [" + req.getQueryString() + "], returning error");
                res.sendError(400, "forcing error");
            } else {
                NotificationTestCase.stdPrintln(new Date().toString() + "Receiving Second notification for [" + req.getQueryString() + "], returning OK");
                res.setStatus(200);
            }
            ++counter;
        }
    }
}

