/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.examples.pi;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.pi.Container;
import org.apache.hadoop.examples.pi.SummationWritable;
import org.apache.hadoop.examples.pi.TaskResult;
import org.apache.hadoop.examples.pi.Util;
import org.apache.hadoop.examples.pi.math.Summation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public final class DistSum
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(DistSum.class);
    private static final String NAME = DistSum.class.getSimpleName();
    private static final String N_PARTS = "mapreduce.pi." + NAME + ".nParts";
    private final Util.Timer timer = new Util.Timer(true);
    private Parameters parameters;

    Parameters getParameters() {
        return this.parameters;
    }

    void setParameters(Parameters p) {
        this.parameters = p;
    }

    private Job createJob(String name, Summation sigma) throws IOException {
        Job job = Job.getInstance((Configuration)this.getConf(), (String)(this.parameters.remoteDir + "/" + name));
        Configuration jobconf = job.getConfiguration();
        job.setJarByClass(DistSum.class);
        jobconf.setInt(N_PARTS, this.parameters.nParts);
        SummationWritable.write(sigma, DistSum.class, jobconf);
        jobconf.setLong("mapreduce.task.timeout", 0L);
        jobconf.setBoolean("mapreduce.map.speculative", false);
        jobconf.setBoolean("mapreduce.reduce.speculative", false);
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void compute(String name, Summation sigma) throws IOException {
        Summation s;
        Path dir;
        if (sigma.getValue() != null) {
            throw new IOException("sigma.getValue() != null, sigma=" + sigma);
        }
        FileSystem fs = FileSystem.get((Configuration)this.getConf());
        if (!Util.createNonexistingDirectory(fs, dir = fs.makeQualified(new Path(this.parameters.remoteDir, name)))) {
            return;
        }
        Job job = this.createJob(name, sigma);
        Path outdir = new Path(dir, "out");
        FileOutputFormat.setOutputPath((Job)job, (Path)outdir);
        String startmessage = "steps/parts = " + sigma.E.getSteps() + "/" + this.parameters.nParts + " = " + Util.long2string(sigma.E.getSteps() / (long)this.parameters.nParts);
        Util.runJob(name, job, this.parameters.machine, startmessage, this.timer);
        List<TaskResult> results = Util.readJobOutputs(fs, outdir);
        Util.writeResults(name, results, fs, this.parameters.remoteDir);
        fs.delete(dir, true);
        List<TaskResult> combined = Util.combine(results);
        try (PrintWriter out = Util.createWriter(this.parameters.localDir, name);){
            for (TaskResult r : combined) {
                String s2 = DistSum.taskResult2string(name, r);
                out.println(s2);
                out.flush();
                Util.out.println(s2);
            }
        }
        if (combined.size() == 1 && sigma.contains(s = combined.get(0).getElement()) && s.contains(sigma)) {
            sigma.setValue(s.getValue());
        }
    }

    public static String taskResult2string(String name, TaskResult result) {
        return NAME + " " + name + "> " + result;
    }

    public static Map.Entry<String, TaskResult> string2TaskResult(String s) {
        int j = s.indexOf(NAME);
        if (j == 0) {
            int i = j + NAME.length() + 1;
            j = s.indexOf("> ", i);
            final String key = s.substring(i, j);
            final TaskResult value = TaskResult.valueOf(s.substring(j + 2));
            return new Map.Entry<String, TaskResult>(){

                @Override
                public String getKey() {
                    return key;
                }

                @Override
                public TaskResult getValue() {
                    return value;
                }

                @Override
                public TaskResult setValue(TaskResult value2) {
                    throw new UnsupportedOperationException();
                }
            };
        }
        return null;
    }

    private Summation execute(String name, Summation sigma) {
        Summation[] summations = sigma.partition(this.parameters.nJobs);
        ArrayList<Computation> computations = new ArrayList<Computation>();
        for (int i = 0; i < summations.length; ++i) {
            computations.add(new Computation(i, name, summations[i]));
        }
        try {
            Util.execute(this.parameters.nThreads, computations);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        List<Summation> combined = Util.combine(Arrays.asList(summations));
        return combined.size() == 1 ? combined.get(0) : null;
    }

    public int run(String[] args) throws Exception {
        if (args.length != 8) {
            return Util.printUsage(args, ((Object)((Object)this)).getClass().getName() + " <name> <sigma> " + "<nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir>" + "\n  <name> The name." + "\n  <sigma> The summation." + "\n  <nThreads> The number of working threads.\n  <nJobs> The number of jobs per sum.\n  <type> 'm' for map side job, 'r' for reduce side job, 'x' for mix type.\n  <nPart> The number of parts per job.\n  <remoteDir> Remote directory for submitting jobs.\n  <localDir> Local directory for storing output files.");
        }
        int i = 0;
        String name = args[i++];
        Summation sigma = Summation.valueOf(args[i++]);
        this.setParameters(Parameters.parse(args, i));
        Util.out.println();
        Util.out.println("name  = " + name);
        Util.out.println("sigma = " + sigma);
        Util.out.println(this.parameters);
        Util.out.println();
        Summation result = this.execute(name, sigma);
        if (result.equals(sigma)) {
            sigma.setValue(result.getValue());
            this.timer.tick("\n\nDONE\n\nsigma=" + sigma);
            return 0;
        }
        this.timer.tick("\n\nDONE WITH ERROR\n\nresult=" + result);
        return 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(null, (Tool)new DistSum(), (String[])args));
    }

    class Computation
    implements Callable<Computation> {
        private final int index;
        private final String name;
        private final Summation sigma;

        Computation(int index, String name, Summation sigma) {
            this.index = index;
            this.name = name;
            this.sigma = sigma;
        }

        String getJobName() {
            return String.format("%s.job%03d", this.name, this.index);
        }

        public String toString() {
            return this.getJobName() + this.sigma;
        }

        @Override
        public Computation call() {
            if (this.sigma.getValue() == null) {
                try {
                    DistSum.this.compute(this.getJobName(), this.sigma);
                }
                catch (Exception e) {
                    Util.out.println("ERROR: Got an exception from " + this.getJobName());
                    e.printStackTrace(Util.out);
                }
            }
            return this;
        }
    }

    public static class MixMachine
    extends Machine {
        private static final MixMachine INSTANCE = new MixMachine();
        private Cluster cluster;

        @Override
        public synchronized void init(Job job) throws IOException {
            Configuration conf = job.getConfiguration();
            if (this.cluster == null) {
                String jobTrackerStr = conf.get("mapreduce.jobtracker.address", "localhost:8012");
                this.cluster = new Cluster(NetUtils.createSocketAddr((String)jobTrackerStr), conf);
            }
            this.chooseMachine(conf).init(job);
        }

        private Machine chooseMachine(Configuration conf) throws IOException {
            int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
            try {
                while (true) {
                    ClusterMetrics status = this.cluster.getClusterStatus();
                    int m = status.getMapSlotCapacity() - status.getOccupiedMapSlots();
                    int r = status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
                    if (m >= parts || r >= parts) {
                        Machine value = r >= parts ? ReduceSide.INSTANCE : MapSide.INSTANCE;
                        Util.out.println("  " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
                        return value;
                    }
                    Thread.sleep(2000L);
                }
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    public static class ReduceSide
    extends Machine {
        private static final ReduceSide INSTANCE = new ReduceSide();

        @Override
        public void init(Job job) {
            job.setMapperClass(PartitionMapper.class);
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(SummationWritable.class);
            job.setPartitionerClass(IndexPartitioner.class);
            job.setReducerClass(SummingReducer.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(TaskResult.class);
            Configuration conf = job.getConfiguration();
            int nParts = conf.getInt(N_PARTS, 1);
            job.setNumReduceTasks(nParts);
            job.setInputFormatClass(SummationInputFormat.class);
        }

        public static class SummingReducer
        extends Reducer<IntWritable, SummationWritable, NullWritable, TaskResult> {
            protected void reduce(IntWritable index, Iterable<SummationWritable> sums, Reducer.Context context) throws IOException, InterruptedException {
                LOG.info((Object)("index=" + index));
                for (SummationWritable sigma : sums) {
                    Machine.compute(sigma.getElement(), context);
                }
            }
        }

        public static class IndexPartitioner
        extends Partitioner<IntWritable, SummationWritable> {
            public int getPartition(IntWritable index, SummationWritable value, int numPartitions) {
                return index.get();
            }
        }

        public static class PartitionMapper
        extends Mapper<NullWritable, SummationWritable, IntWritable, SummationWritable> {
            protected void map(NullWritable nw, SummationWritable sigma, Mapper.Context context) throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                int nParts = conf.getInt(N_PARTS, 0);
                Summation[] parts = sigma.getElement().partition(nParts);
                for (int i = 0; i < parts.length; ++i) {
                    context.write((Object)new IntWritable(i), (Object)new SummationWritable(parts[i]));
                    LOG.info((Object)("parts[" + i + "] = " + parts[i]));
                }
            }
        }

        public static class SummationInputFormat
        extends Machine.AbstractInputFormat {
            public List<InputSplit> getSplits(JobContext context) {
                Configuration conf = context.getConfiguration();
                Summation sigma = SummationWritable.read(DistSum.class, conf);
                ArrayList<InputSplit> splits = new ArrayList<InputSplit>(1);
                splits.add(new Machine.SummationSplit(sigma));
                return splits;
            }
        }
    }

    public static class MapSide
    extends Machine {
        private static final MapSide INSTANCE = new MapSide();

        @Override
        public void init(Job job) {
            job.setMapperClass(SummingMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(TaskResult.class);
            job.setNumReduceTasks(0);
            job.setInputFormatClass(PartitionInputFormat.class);
        }

        public static class SummingMapper
        extends Mapper<NullWritable, SummationWritable, NullWritable, TaskResult> {
            protected void map(NullWritable nw, SummationWritable sigma, Mapper.Context context) throws IOException, InterruptedException {
                Machine.compute(sigma.getElement(), context);
            }
        }

        public static class PartitionInputFormat
        extends Machine.AbstractInputFormat {
            public List<InputSplit> getSplits(JobContext context) {
                Configuration conf = context.getConfiguration();
                Summation sigma = SummationWritable.read(DistSum.class, conf);
                int nParts = conf.getInt(N_PARTS, 0);
                ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nParts);
                Summation[] parts = sigma.partition(nParts);
                for (int i = 0; i < parts.length; ++i) {
                    splits.add(new Machine.SummationSplit(parts[i]));
                }
                return splits;
            }
        }
    }

    public static abstract class Machine {
        abstract void init(Job var1) throws IOException;

        public String toString() {
            return this.getClass().getSimpleName();
        }

        static void compute(Summation sigma, TaskInputOutputContext<?, ?, NullWritable, TaskResult> context) throws IOException, InterruptedException {
            String s = "sigma=" + sigma;
            LOG.info((Object)s);
            context.setStatus(s);
            long start = System.currentTimeMillis();
            sigma.compute();
            long duration = System.currentTimeMillis() - start;
            TaskResult result = new TaskResult(sigma, duration);
            s = "result=" + result;
            LOG.info((Object)s);
            context.setStatus(s);
            context.write((Object)NullWritable.get(), (Object)result);
        }

        public static abstract class AbstractInputFormat
        extends InputFormat<NullWritable, SummationWritable> {
            public final RecordReader<NullWritable, SummationWritable> createRecordReader(InputSplit generic, TaskAttemptContext context) {
                final SummationSplit split = (SummationSplit)generic;
                return new RecordReader<NullWritable, SummationWritable>(){
                    boolean done = false;

                    public void initialize(InputSplit split2, TaskAttemptContext context) {
                    }

                    public boolean nextKeyValue() {
                        return !this.done ? (this.done = true) : false;
                    }

                    public NullWritable getCurrentKey() {
                        return NullWritable.get();
                    }

                    public SummationWritable getCurrentValue() {
                        return new SummationWritable(split.getElement());
                    }

                    public float getProgress() {
                        return this.done ? 1.0f : 0.0f;
                    }

                    public void close() {
                    }
                };
            }
        }

        public static final class SummationSplit
        extends InputSplit
        implements Writable,
        Container<Summation> {
            private static final String[] EMPTY = new String[0];
            private Summation sigma;

            public SummationSplit() {
            }

            private SummationSplit(Summation sigma) {
                this.sigma = sigma;
            }

            @Override
            public Summation getElement() {
                return this.sigma;
            }

            public long getLength() {
                return 1L;
            }

            public String[] getLocations() {
                return EMPTY;
            }

            public void readFields(DataInput in) throws IOException {
                this.sigma = SummationWritable.read(in);
            }

            public void write(DataOutput out) throws IOException {
                new SummationWritable(this.sigma).write(out);
            }
        }
    }

    static class Parameters {
        static final int COUNT = 6;
        static final String LIST = "<nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir>";
        static final String DESCRIPTION = "\n  <nThreads> The number of working threads.\n  <nJobs> The number of jobs per sum.\n  <type> 'm' for map side job, 'r' for reduce side job, 'x' for mix type.\n  <nPart> The number of parts per job.\n  <remoteDir> Remote directory for submitting jobs.\n  <localDir> Local directory for storing output files.";
        final int nThreads;
        final int nJobs;
        final int nParts;
        final Machine machine;
        final String remoteDir;
        final File localDir;

        private Parameters(Machine machine, int nThreads, int nJobs, int nParts, String remoteDir, File localDir) {
            this.machine = machine;
            this.nThreads = nThreads;
            this.nJobs = nJobs;
            this.nParts = nParts;
            this.remoteDir = remoteDir;
            this.localDir = localDir;
        }

        public String toString() {
            return "\nnThreads  = " + this.nThreads + "\nnJobs     = " + this.nJobs + "\nnParts    = " + this.nParts + " (" + this.machine + ")" + "\nremoteDir = " + this.remoteDir + "\nlocalDir  = " + this.localDir;
        }

        static Parameters parse(String[] args, int i) {
            if (args.length - i < 6) {
                throw new IllegalArgumentException("args.length - i < COUNT = 6, args.length=" + args.length + ", i=" + i + ", args=" + Arrays.asList(args));
            }
            int nThreads = Integer.parseInt(args[i++]);
            int nJobs = Integer.parseInt(args[i++]);
            String type = args[i++];
            int nParts = Integer.parseInt(args[i++]);
            String remoteDir = args[i++];
            File localDir = new File(args[i++]);
            if (!("m".equals(type) || "r".equals(type) || "x".equals(type))) {
                throw new IllegalArgumentException("type=" + type + " is not equal to m, r or x");
            }
            if (nParts <= 0) {
                throw new IllegalArgumentException("nParts = " + nParts + " <= 0");
            }
            if (nJobs <= 0) {
                throw new IllegalArgumentException("nJobs = " + nJobs + " <= 0");
            }
            if (nThreads <= 0) {
                throw new IllegalArgumentException("nThreads = " + nThreads + " <= 0");
            }
            Util.checkDirectory(localDir);
            return new Parameters("m".equals(type) ? MapSide.INSTANCE : ("r".equals(type) ? ReduceSide.INSTANCE : MixMachine.INSTANCE), nThreads, nJobs, nParts, remoteDir, localDir);
        }
    }
}

