/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.mapreduce.tools;

import com.mapr.db.Admin;
import com.mapr.db.TableDescriptor;
import com.mapr.db.impl.BaseJsonTable;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.impl.MapRDBTableImpl;
import com.mapr.db.impl.TableDescriptorImpl;
import com.mapr.db.mapreduce.BulkLoadOutputFormat;
import com.mapr.db.mapreduce.BulkLoadRecordWriter;
import com.mapr.db.mapreduce.DocumentSerialization;
import com.mapr.db.mapreduce.MapRDBMapReduceUtil;
import com.mapr.db.mapreduce.TableOutputFormat;
import com.mapr.db.mapreduce.ValueSerialization;
import com.mapr.db.mapreduce.impl.MapReduceUtilMethods;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.rowcol.DBValueBuilderImpl;
import com.mapr.db.rowcol.IdValueComparator;
import com.mapr.db.rowcol.KeyValue;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.Value;
import org.ojai.json.Json;
import org.ojai.json.mapreduce.JSONFileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ImportJSON
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(ImportJSON.class);
    private static final String NAME = "importJSON";
    public static final String TABLE_NAME = "import.table.name";
    private static int NUM_REDUCE_TASKS;
    private static String srcPath;
    private static String dstPath;
    private static boolean bulkLoad;
    private static boolean mapreduce;
    private static int numThreads;
    private static String columnSpec;
    private static String keyField;
    private static boolean isSuccess;

    public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Path inputDir = new Path(srcPath);
        Job job = Job.getInstance((Configuration)conf, (String)"importJSON_import.table.name");
        job.setJarByClass(ImportJSON.class);
        JSONFileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{inputDir});
        job.setInputFormatClass(JSONFileInputFormat.class);
        Configuration config = job.getConfiguration();
        config.setStrings("io.serializations", new String[]{config.get("io.serializations"), DocumentSerialization.class.getName(), ValueSerialization.class.getName()});
        config.set(TABLE_NAME, dstPath);
        job.setMapperClass(JsonTextImporter.class);
        job.setOutputKeyClass(KeyValue.class);
        job.setSortComparatorClass(IdValueComparator.class);
        job.setOutputValueClass(DBDocumentImpl.class);
        job.setSpeculativeExecution(false);
        if (keyField != null) {
            config.set("idField", keyField);
        }
        if (!bulkLoad) {
            job.setOutputFormatClass(TableOutputFormat.class);
            config.set("maprdb.mapred.outputtable", dstPath);
            NUM_REDUCE_TASKS = 0;
        } else {
            job.setOutputFormatClass(BulkLoadOutputFormat.class);
            config.set("MapRDBImpl.mapreduce.bulkloadrecordwriter.outputTable", dstPath);
            NUM_REDUCE_TASKS = MapRDBMapReduceUtil.configurePartitioner((Job)job, (String)dstPath);
        }
        job.setNumReduceTasks(NUM_REDUCE_TASKS);
        return job;
    }

    private void createTable(Admin maprAdmin, String path) {
        TableDescriptorImpl tableDesc = new TableDescriptorImpl();
        tableDesc.setPath(path);
        tableDesc.setBulkLoad(bulkLoad);
        maprAdmin.createTable((TableDescriptor)tableDesc);
    }

    private static void Usage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: importJSON [options] -src <Input text file/directory path> -dst <MapR-DB Destination table path>\nOptions:\n[-idfield <Name of ID field in JSON Data>]\n[-bulkload <true|false>, default is false]\n[-mapreduce <true|false>, default is true]\n(Can not use bulkload mode with mapreduce = false)\n(If no ID field is specified, an ID field is expected to be present in the JSON Record)\n");
        System.exit(1);
    }

    private static void ParseArgs(String[] args) throws Exception {
        for (int i = 0; i < args.length; ++i) {
            if (i == args.length - 1) {
                ImportJSON.Usage("PARSE ARGS: " + args[i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-src")) {
                srcPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-dst")) {
                dstPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-bulkload")) {
                bulkLoad = Boolean.valueOf(args[++i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-mapreduce")) {
                mapreduce = Boolean.valueOf(args[++i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-columns")) {
                columnSpec = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-idfield")) {
                keyField = args[++i];
                continue;
            }
            ImportJSON.Usage("PARSE ARGS: " + args[i]);
        }
        if (!mapreduce && bulkLoad) {
            ImportJSON.Usage("Bulkload mode in non M/R require data in file to be sorted");
        }
        if (srcPath == null || dstPath == null) {
            ImportJSON.Usage("missing -src or -dst.");
        }
        if (!MapReduceUtilMethods.checkBulkloadStatus((boolean)bulkLoad, (String)dstPath)) {
            ImportJSON.Usage("Table " + dstPath + " is in bulkload mode and can't work with bulkload = false option.");
        }
    }

    private void run_NonMR() throws Exception {
        Path inputPath = new Path(srcPath);
        Configuration config = this.getConf();
        FileSystem fs = inputPath.getFileSystem(config);
        FileStatus[] status = fs.globStatus(inputPath);
        if (status == null) {
            System.err.println("Given path " + srcPath + " does not exist. No data to copy.");
            isSuccess = false;
            return;
        }
        ArrayList<Path> filesToBeProcessed = new ArrayList<Path>();
        for (FileStatus f : status) {
            if (f.isDirectory()) {
                FileStatus[] statuses;
                for (FileStatus s : statuses = fs.listStatus(f.getPath())) {
                    if (s.isDirectory() || s.getPath().getName().startsWith("_")) continue;
                    filesToBeProcessed.add(s.getPath());
                }
                continue;
            }
            filesToBeProcessed.add(f.getPath());
        }
        long ts = System.currentTimeMillis();
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        for (int i = 0; i < filesToBeProcessed.size(); ++i) {
            executor.execute(new ImportJSONThread(i, (Path)filesToBeProcessed.get(i), config));
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }

    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
        if (otherArgs.length < 2) {
            ImportJSON.Usage("Wrong number of arguments: " + otherArgs.length);
            System.exit(-1);
        }
        ImportJSON.ParseArgs(otherArgs);
        Admin maprAdmin = MapRDBImpl.newAdmin();
        if (!maprAdmin.tableExists(dstPath)) {
            this.createTable(maprAdmin, dstPath);
        }
        TableDescriptorImpl descriptor = (TableDescriptorImpl)maprAdmin.getTableDescriptor(dstPath);
        boolean destTableBulkload = descriptor.isBulkLoad();
        if (!bulkLoad && destTableBulkload) {
            bulkLoad = true;
        }
        if (!mapreduce) {
            this.run_NonMR();
            return isSuccess ? 0 : 1;
        }
        Job job = ImportJSON.createSubmittableJob(this.getConf(), otherArgs);
        boolean isJobSuccessful = job.waitForCompletion(true);
        if (descriptor.isBulkLoad()) {
            descriptor.setBulkLoad(false);
            maprAdmin.alterTable((TableDescriptor)descriptor);
        }
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int ret = 0;
        try {
            ret = ToolRunner.run((Configuration)new Configuration(), (Tool)new ImportJSON(), (String[])args);
        }
        catch (Exception e) {
            ret = 1;
            e.printStackTrace();
        }
        System.exit(ret);
    }

    static {
        bulkLoad = false;
        mapreduce = true;
        numThreads = 16;
        columnSpec = null;
        keyField = null;
        isSuccess = false;
    }

    public static class JsonTextImporter
    extends Mapper<LongWritable, Document, Value, Document> {
        public static int counter = 0;

        public void map(LongWritable key, Document record, Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            String idField = conf.get("idField");
            String fieldPath = conf.get("import.path");
            Document rec = fieldPath != null ? (Document)record.getValue(fieldPath) : record;
            rec = (Document)DBValueBuilderImpl.KeyValueBuilder.initFrom(rec);
            Value docKey = idField == null ? rec.getId() : rec.getValue(idField);
            context.write((Object)docKey, (Object)rec);
        }
    }

    class ImportJSONThread
    extends BaseImportJSONThread {
        ImportJSONThread(int id, Path t, Configuration config) {
            super(id, t, config);
        }

        @Override
        public void run() {
            FSDataInputStream inputStream = null;
            try {
                FileSystem fs = this.filePath.getFileSystem(this.config);
                inputStream = fs.open(this.filePath);
                DocumentStream documentStream = Json.newDocumentStream((InputStream)inputStream);
                Iterator iter = documentStream.iterator();
                Value keySpecifiedInDoc = null;
                BulkLoadRecordWriter writer = null;
                if (bulkLoad) {
                    writer = new BulkLoadRecordWriter(ImportJSON.this.getConf(), new Path(dstPath));
                    this.importBulkload(iter, (RecordWriter)writer);
                    writer.close(null);
                    isSuccess = true;
                    return;
                }
                MapRDBTableImpl tab = (MapRDBTableImpl)MapRDBImpl.getTable((Configuration)this.config, (Path)new Path(dstPath));
                tab.setPrivateOption(BaseJsonTable.TablePrivateOption.PRESERVE_TIMESTAMP, true);
                int recordCount = 0;
                while (iter.hasNext()) {
                    Document docValue = (Document)iter.next();
                    if (keyField != null) {
                        keySpecifiedInDoc = docValue.getValue(keyField);
                        tab.insertOrReplace(keySpecifiedInDoc, docValue);
                    } else {
                        tab.insertOrReplace(docValue);
                    }
                    ++recordCount;
                }
                LOG.debug("recordCount " + Integer.toString(recordCount));
                tab.flush();
                tab.close();
            }
            catch (Exception io) {
                LOG.error("importJSON encountered an exception: " + io.getMessage());
                io.printStackTrace();
                isSuccess = false;
                return;
            }
            isSuccess = true;
        }

        void importBulkload(Iterator<Document> iter, RecordWriter writer) {
            while (iter.hasNext()) {
                Document docValue = iter.next();
                Value key = null;
                key = keyField != null ? docValue.getValue(keyField) : docValue.getValue("_id");
                try {
                    writer.write((Object)key, (Object)docValue);
                }
                catch (Exception io) {
                    LOG.error("importJSON encountered an exception in bulkload mode: " + io.getMessage());
                    io.printStackTrace();
                }
            }
        }
    }

    abstract class BaseImportJSONThread
    implements Runnable {
        protected Path filePath;
        protected int myid;
        protected Configuration config;

        protected BaseImportJSONThread(int id, Path filePath, Configuration config) {
            this.myid = id;
            this.filePath = filePath;
            this.config = config;
        }
    }
}

