/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.mapreduce.tools;

import com.mapr.db.Admin;
import com.mapr.db.MapRDB;
import com.mapr.db.TableDescriptor;
import com.mapr.db.impl.IdCodec;
import com.mapr.db.impl.MapRDBTableImpl;
import com.mapr.db.impl.TableDescriptorImpl;
import com.mapr.db.mapreduce.BulkLoadOutputFormat;
import com.mapr.db.mapreduce.BulkLoadRecordWriter;
import com.mapr.db.mapreduce.DocumentSerialization;
import com.mapr.db.mapreduce.TableOutputFormat;
import com.mapr.db.mapreduce.TotalOrderPartitioner;
import com.mapr.db.mapreduce.ValueSerialization;
import com.mapr.db.mapreduce.impl.JsonBulkLoadReducer;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.rowcol.IdValueComparator;
import com.mapr.db.rowcol.KeyValue;
import com.mapr.db.rowcol.KeyValueBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.Value;
import org.ojai.json.Json;
import org.ojai.json.mapreduce.JSONFileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ImportJSON
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(ImportJSON.class);
    private static final String NAME = "importJSON";
    public static final String TABLE_NAME = "import.table.name";
    private static int NUM_REDUCE_TASKS;
    private static String srcPath;
    private static String dstPath;
    private static boolean bulkLoad;
    private static boolean mapreduce;
    private static int numThreads;
    private static String columnSpec;
    private static String keyField;
    private static boolean isSuccess;

    public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Path inputDir = new Path(srcPath);
        Job job = Job.getInstance((Configuration)conf, (String)"importJSON_import.table.name");
        job.setJarByClass(ImportJSON.class);
        JSONFileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{inputDir});
        job.setInputFormatClass(JSONFileInputFormat.class);
        Configuration config = job.getConfiguration();
        config.setStrings("io.serializations", new String[]{config.get("io.serializations"), DocumentSerialization.class.getName(), ValueSerialization.class.getName()});
        config.set(TABLE_NAME, dstPath);
        job.setMapperClass(JsonTextImporter.class);
        job.setOutputKeyClass(KeyValue.class);
        job.setSortComparatorClass(IdValueComparator.class);
        job.setOutputValueClass(DBDocumentImpl.class);
        job.setSpeculativeExecution(false);
        if (keyField != null) {
            config.set("idField", keyField);
        }
        if (!bulkLoad) {
            job.setOutputFormatClass(TableOutputFormat.class);
            config.set("maprdb.mapred.outputtable", dstPath);
            NUM_REDUCE_TASKS = 0;
        } else {
            job.setOutputFormatClass(BulkLoadOutputFormat.class);
            config.set("maprdb.mapreduce.bulkloadrecordwriter.outputTable", dstPath);
            String uuid = UUID.randomUUID().toString();
            job.setPartitionerClass(TotalOrderPartitioner.class);
            Path partitionFile = new Path(config.get("hadoop.tmp.dir"), "partitions_" + uuid);
            TotalOrderPartitioner.setPartitionFile((Configuration)config, (Path)partitionFile);
            List partitionSplitPoints = BulkLoadOutputFormat.getPartitionSplitPoints((String)dstPath);
            ImportJSON.writePartitions(job, partitionFile, partitionSplitPoints);
            job.setReducerClass(JsonBulkLoadReducer.class);
            NUM_REDUCE_TASKS = partitionSplitPoints.size();
            try {
                URI partitionUri = new URI(partitionFile.toString());
                job.addCacheFile(partitionUri);
            }
            catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        job.setNumReduceTasks(NUM_REDUCE_TASKS);
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void writePartitions(Job job, Path partitionsPath, List<ByteBuffer> startKeys) throws IOException, IllegalArgumentException {
        Configuration conf = job.getConfiguration();
        if (startKeys.isEmpty()) {
            throw new IllegalArgumentException("No regions passed");
        }
        TreeSet<ByteBuffer> sorted = new TreeSet<ByteBuffer>(startKeys);
        ByteBuffer first = sorted.first();
        if (first.limit() != 0) {
            throw new IllegalArgumentException("First region of table should have empty start key. Instead has: " + first.toString());
        }
        sorted.remove(first);
        FileSystem fs = partitionsPath.getFileSystem(conf);
        try (SequenceFile.Writer writer = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (Path)partitionsPath, (Class)job.getMapOutputKeyClass(), NullWritable.class);){
            for (ByteBuffer startKey : sorted) {
                writer.append((Object)IdCodec.decode((ByteBuffer)startKey), (Object)NullWritable.get());
            }
        }
    }

    private void createTable(Admin maprAdmin, String path) {
        TableDescriptorImpl tableDesc = new TableDescriptorImpl();
        tableDesc.setPath(path);
        tableDesc.setBulkLoad(bulkLoad);
        maprAdmin.createTable((TableDescriptor)tableDesc);
    }

    private static void Usage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: importJSON [options] -src <Input text file/directory path> -dst <MapR-DB Destination table path>\nOptions:\n[-idfield <Name of ID field in JSON Data>]\n[-bulkload <true|false>, default is true]\n[-mapreduce <true|false>, default is true]\n(Can not use bulkload mode with mapreduce = false)\n(If no ID field is specified, an ID field is expected to be present in the JSON Record)\n");
        System.exit(1);
    }

    private static void ParseArgs(String[] args) throws Exception {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equalsIgnoreCase("-src")) {
                srcPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-dst")) {
                dstPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-bulkload")) {
                bulkLoad = Boolean.valueOf(args[++i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-mapreduce")) {
                mapreduce = Boolean.valueOf(args[++i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-columns")) {
                columnSpec = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-idfield")) {
                keyField = args[++i];
                continue;
            }
            System.err.println("PARSE ARGS: " + args[i]);
            ImportJSON.Usage(null);
        }
        if (!mapreduce && bulkLoad) {
            ImportJSON.Usage("Bulkload mode in non M/R require data in file to be sorted");
        }
        if (srcPath == null || dstPath == null) {
            ImportJSON.Usage("missing -src or -dst.");
        }
    }

    private void run_NonMR() throws Exception {
        Path inputPath = new Path(srcPath);
        Configuration config = this.getConf();
        FileSystem fs = inputPath.getFileSystem(config);
        FileStatus[] status = fs.globStatus(inputPath);
        if (status == null) {
            System.err.println("Given path " + srcPath + " does not exist. No data to copy.");
            isSuccess = false;
            return;
        }
        ArrayList<Path> filesToBeProcessed = new ArrayList<Path>();
        for (FileStatus f : status) {
            if (f.isDirectory()) {
                FileStatus[] statuses;
                for (FileStatus s : statuses = fs.listStatus(f.getPath())) {
                    if (s.isDirectory() || s.getPath().getName().startsWith("_")) continue;
                    filesToBeProcessed.add(s.getPath());
                }
                continue;
            }
            filesToBeProcessed.add(f.getPath());
        }
        long ts = System.currentTimeMillis();
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        for (int i = 0; i < filesToBeProcessed.size(); ++i) {
            executor.execute(new ImportJSONThread(i, (Path)filesToBeProcessed.get(i), config));
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }

    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
        if (otherArgs.length < 2) {
            ImportJSON.Usage("Wrong number of arguments: " + otherArgs.length);
            System.exit(-1);
        }
        ImportJSON.ParseArgs(otherArgs);
        Admin maprAdmin = MapRDB.newAdmin();
        if (!maprAdmin.tableExists(dstPath)) {
            this.createTable(maprAdmin, dstPath);
        }
        if (!mapreduce) {
            this.run_NonMR();
            return isSuccess ? 0 : 1;
        }
        Job job = ImportJSON.createSubmittableJob(this.getConf(), otherArgs);
        boolean isJobSuccessful = job.waitForCompletion(true);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int ret = 0;
        try {
            ret = ToolRunner.run((Configuration)new Configuration(), (Tool)new ImportJSON(), (String[])args);
        }
        catch (Exception e) {
            ret = 1;
            e.printStackTrace();
        }
        System.exit(ret);
    }

    static {
        bulkLoad = true;
        mapreduce = true;
        numThreads = 16;
        columnSpec = null;
        keyField = null;
        isSuccess = false;
    }

    public static class JsonTextImporter
    extends Mapper<LongWritable, Document, Value, Document> {
        public static int counter = 0;

        public void map(LongWritable key, Document record, Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            String idField = conf.get("idField");
            String fieldPath = conf.get("import.path");
            Document rec = fieldPath != null ? (Document)record.getValue(fieldPath) : record;
            rec = (Document)KeyValueBuilder.initFrom((Document)rec);
            Value docKey = idField == null ? rec.getId() : rec.getValue(idField);
            context.write((Object)docKey, (Object)rec);
        }
    }

    class ImportJSONThread
    extends BaseImportJSONThread {
        ImportJSONThread(int id, Path t, Configuration config) {
            super(id, t, config);
        }

        @Override
        public void run() {
            FSDataInputStream inputStream = null;
            try {
                FileSystem fs = this.filePath.getFileSystem(this.config);
                inputStream = fs.open(this.filePath);
                DocumentStream documentStream = Json.newDocumentStream((InputStream)inputStream);
                Iterator iter = documentStream.iterator();
                String key = null;
                BulkLoadRecordWriter writer = null;
                if (bulkLoad) {
                    writer = new BulkLoadRecordWriter(ImportJSON.this.getConf(), new Path(dstPath));
                    this.importBulkload(iter, (RecordWriter)writer);
                    writer.close(null);
                    isSuccess = true;
                    return;
                }
                MapRDBTableImpl tab = new MapRDBTableImpl(new Path(dstPath), this.config);
                tab.setPrivateOption(MapRDBTableImpl.TablePrivateOption.PRESERVE_TIMESTAMP, true);
                int recordCount = 0;
                while (iter.hasNext()) {
                    Document docValue = (Document)iter.next();
                    if (keyField != null) {
                        key = docValue.getString(keyField);
                        tab.insertOrReplace(key, docValue);
                    } else {
                        tab.insertOrReplace(docValue);
                    }
                    ++recordCount;
                }
                LOG.debug("recordCount " + Integer.toString(recordCount));
                tab.flush();
                tab.close();
            }
            catch (Exception io) {
                LOG.error("importJSON encountered an exception: " + io.getMessage());
                io.printStackTrace();
                isSuccess = false;
                return;
            }
            isSuccess = true;
        }

        void importBulkload(Iterator<Document> iter, RecordWriter writer) {
            while (iter.hasNext()) {
                Document docValue = iter.next();
                Value key = null;
                key = keyField != null ? docValue.getValue(keyField) : docValue.getValue("_id");
                try {
                    writer.write((Object)key, (Object)docValue);
                }
                catch (Exception io) {
                    LOG.error("importJSON encountered an exception in bulkload mode: " + io.getMessage());
                    io.printStackTrace();
                }
            }
        }
    }

    abstract class BaseImportJSONThread
    implements Runnable {
        protected Path filePath;
        protected int myid;
        protected Configuration config;

        protected BaseImportJSONThread(int id, Path filePath, Configuration config) {
            this.myid = id;
            this.filePath = filePath;
            this.config = config;
        }
    }
}

