/*
 * Decompiled with CFR 0.152.
 */
package org.apache.mahout.classifier.df.mapreduce;

import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.classifier.df.DFUtils;
import org.apache.mahout.classifier.df.DecisionForest;
import org.apache.mahout.classifier.df.data.DataConverter;
import org.apache.mahout.classifier.df.data.Dataset;
import org.apache.mahout.classifier.df.data.Instance;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.RandomUtils;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Classifier {
    private static final Logger log = LoggerFactory.getLogger(Classifier.class);
    private final Path forestPath;
    private final Path inputPath;
    private final Path datasetPath;
    private final Configuration conf;
    private final Path outputPath;
    private final Path mappersOutputPath;
    private double[][] results;

    public double[][] getResults() {
        return this.results;
    }

    public Classifier(Path forestPath, Path inputPath, Path datasetPath, Path outputPath, Configuration conf) {
        this.forestPath = forestPath;
        this.inputPath = inputPath;
        this.datasetPath = datasetPath;
        this.outputPath = outputPath;
        this.conf = conf;
        this.mappersOutputPath = new Path(outputPath, "mappers");
    }

    private void configureJob(Job job) throws IOException {
        job.setJarByClass(Classifier.class);
        FileInputFormat.setInputPaths(job, this.inputPath);
        FileOutputFormat.setOutputPath(job, this.mappersOutputPath);
        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(CMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(CTextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
    }

    public void run() throws IOException, ClassNotFoundException, InterruptedException {
        FileSystem fs = FileSystem.get(this.conf);
        if (fs.exists(this.outputPath)) {
            throw new IOException("Output path already exists : " + this.outputPath);
        }
        log.info("Adding the dataset to the DistributedCache");
        DistributedCache.addCacheFile(this.datasetPath.toUri(), this.conf);
        log.info("Adding the decision forest to the DistributedCache");
        DistributedCache.addCacheFile(this.forestPath.toUri(), this.conf);
        Job job = new Job(this.conf, "decision forest classifier");
        log.info("Configuring the job...");
        this.configureJob(job);
        log.info("Running the job...");
        if (!job.waitForCompletion(true)) {
            throw new IllegalStateException("Job failed!");
        }
        this.parseOutput(job);
        HadoopUtil.delete(this.conf, this.mappersOutputPath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void parseOutput(JobContext job) throws IOException {
        Configuration conf = job.getConfiguration();
        FileSystem fs = this.mappersOutputPath.getFileSystem(conf);
        Path[] outfiles = DFUtils.listOutputFiles(fs, this.mappersOutputPath);
        ArrayList<double[]> resList = Lists.newArrayList();
        for (Path path : outfiles) {
            DataOutputStream ofile = null;
            try {
                for (Pair record : new SequenceFileIterable(path, true, conf)) {
                    double key = ((DoubleWritable)record.getFirst()).get();
                    String value = ((Text)record.getSecond()).toString();
                    if (ofile == null) {
                        ofile = fs.create(new Path(this.outputPath, value).suffix(".out"));
                        continue;
                    }
                    ofile.writeChars(value);
                    ofile.writeChar(10);
                    resList.add(new double[]{key, Double.valueOf(value)});
                }
            }
            finally {
                Closeables.close(ofile, false);
            }
        }
        this.results = new double[resList.size()][2];
        resList.toArray((T[])this.results);
    }

    public static class CMapper
    extends Mapper<LongWritable, Text, DoubleWritable, Text> {
        private DataConverter converter;
        private DecisionForest forest;
        private final Random rng = RandomUtils.getRandom();
        private boolean first = true;
        private final Text lvalue = new Text();
        private Dataset dataset;
        private final DoubleWritable lkey = new DoubleWritable();

        @Override
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration conf = context.getConfiguration();
            Path[] files = HadoopUtil.getCachedFiles(conf);
            if (files.length < 2) {
                throw new IOException("not enough paths in the DistributedCache");
            }
            this.dataset = Dataset.load(conf, files[0]);
            this.converter = new DataConverter(this.dataset);
            this.forest = DecisionForest.load(conf, files[1]);
            if (this.forest == null) {
                throw new InterruptedException("DecisionForest not found!");
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String line;
            if (this.first) {
                FileSplit split = (FileSplit)context.getInputSplit();
                Path path = split.getPath();
                this.lvalue.set(path.getName());
                this.lkey.set(key.get());
                context.write(this.lkey, this.lvalue);
                this.first = false;
            }
            if (!(line = value.toString()).isEmpty()) {
                Instance instance = this.converter.convert(line);
                double prediction = this.forest.classify(this.dataset, this.rng, instance);
                this.lkey.set(this.dataset.getLabel(instance));
                this.lvalue.set(Double.toString(prediction));
                context.write(this.lkey, this.lvalue);
            }
        }
    }

    private static class CTextInputFormat
    extends TextInputFormat {
        private CTextInputFormat() {
        }

        @Override
        protected boolean isSplitable(JobContext jobContext, Path path) {
            return false;
        }
    }
}

