/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.mapreduce.tools;

import com.google.common.collect.BiMap;
import com.mapr.db.DBDocument;
import com.mapr.db.impl.IdCodec;
import com.mapr.db.impl.MapRDBTableImpl;
import com.mapr.db.mapreduce.BulkLoadOutputFormat;
import com.mapr.db.mapreduce.BulkLoadRecordWriter;
import com.mapr.db.mapreduce.ByteBufWritableComparable;
import com.mapr.db.mapreduce.DBDocumentSerialization;
import com.mapr.db.mapreduce.JsonBulkLoadReducer;
import com.mapr.db.mapreduce.JsonImportMapper;
import com.mapr.db.mapreduce.TableOutputFormat;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.rowcol.SequenceFileRowColCodec;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.ojai.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Import
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(Import.class);
    private static final String NAME = "Import";
    private static int NUM_REDUCE_TASKS;
    private static String srcPath;
    private static String dstPath;
    private static boolean bulkLoad;
    private static boolean mapReduce;
    private static int numThreads;
    private static boolean isSuccess;

    private static Job createSubmittableJob(Configuration conf, String[] otherArgs) throws IOException {
        Job job = new Job(conf, "Import_" + dstPath);
        job.setJarByClass(Import.class);
        SequenceFileInputFormat.setInputPaths((Job)job, (String)srcPath);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        Configuration config = job.getConfiguration();
        config.setStrings("io.serializations", new String[]{conf.get("io.serializations"), DBDocumentSerialization.class.getName()});
        job.setMapperClass(JsonImportMapper.class);
        job.setMapOutputKeyClass(ByteBufWritableComparable.class);
        job.setMapOutputValueClass(DBDocumentImpl.class);
        job.setOutputKeyClass(ByteBufWritableComparable.class);
        job.setOutputValueClass(DBDocumentImpl.class);
        if (!bulkLoad) {
            job.setOutputFormatClass(TableOutputFormat.class);
            config.set("maprdb.mapred.outputtable", dstPath);
            NUM_REDUCE_TASKS = 0;
        } else {
            job.setOutputFormatClass(BulkLoadOutputFormat.class);
            config.set("maprdb.mapreduce.bulkloadrecordwriter.outputTable", dstPath);
            String uuid = UUID.randomUUID().toString();
            job.setPartitionerClass(TotalOrderPartitioner.class);
            Path partitionFile = new Path(config.get("hadoop.tmp.dir"), "partitions_" + uuid);
            TotalOrderPartitioner.setPartitionFile((Configuration)config, (Path)partitionFile);
            List<ByteBufWritableComparable> partitionSplitPoints = BulkLoadOutputFormat.getPartitionSplitPoints(dstPath);
            Import.writePartitions(job, partitionFile, partitionSplitPoints);
            job.setReducerClass(JsonBulkLoadReducer.class);
            NUM_REDUCE_TASKS = partitionSplitPoints.size();
            try {
                URI partitionUri = new URI(partitionFile.toString());
                job.addCacheFile(partitionUri);
            }
            catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        job.setNumReduceTasks(NUM_REDUCE_TASKS);
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void writePartitions(Job job, Path partitionsPath, List<ByteBufWritableComparable> startKeys) throws IOException, IllegalArgumentException {
        Configuration conf = job.getConfiguration();
        if (startKeys.isEmpty()) {
            throw new IllegalArgumentException("No regions passed");
        }
        TreeSet<ByteBufWritableComparable> sorted = new TreeSet<ByteBufWritableComparable>(startKeys);
        ByteBufWritableComparable first = sorted.first();
        if (first.getByteBuf().limit() != 0) {
            throw new IllegalArgumentException("First region of table should have empty start key. Instead has: " + first.toString());
        }
        sorted.remove(first);
        FileSystem fs = partitionsPath.getFileSystem(conf);
        try (SequenceFile.Writer writer = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (Path)partitionsPath, (Class)job.getMapOutputKeyClass(), NullWritable.class);){
            for (ByteBufWritableComparable startKey : sorted) {
                writer.append((Writable)startKey, (Writable)NullWritable.get());
            }
        }
    }

    public static void doCleanup() {
    }

    private static void Usage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: Import [options] -src <Input binary file/directory path> -dst <MapR-DB Destination table path>\nOptions:\n[-bulkload <true|false>, default is true]\n");
        System.exit(1);
    }

    private static void ParseArgs(String[] args) throws Exception {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equalsIgnoreCase("-src")) {
                srcPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-dst")) {
                dstPath = args[++i];
                continue;
            }
            if (args[i].equalsIgnoreCase("-bulkload")) {
                bulkLoad = Boolean.valueOf(args[++i]);
                continue;
            }
            if (args[i].equalsIgnoreCase("-mapreduce")) {
                mapReduce = Boolean.valueOf(args[++i]);
                continue;
            }
            System.err.println("PARSE ARGS: " + args[i]);
            Import.Usage(null);
        }
        mapReduce = false;
        if (srcPath == null || dstPath == null) {
            Import.Usage("missing -src or -dst.");
        }
    }

    private void run_NonMR() throws Exception {
        Configuration config = this.getConf();
        Path inputPath = new Path(srcPath);
        FileSystem fs = inputPath.getFileSystem(config);
        FileStatus[] status = fs.globStatus(inputPath);
        ArrayList<Path> filesToBeProcessed = new ArrayList<Path>();
        if (status != null) {
            for (FileStatus f : status) {
                if (f.isDirectory()) {
                    FileStatus[] statuses;
                    for (FileStatus s : statuses = fs.listStatus(f.getPath())) {
                        if (s.isDirectory() || s.getPath().getName().startsWith("_")) continue;
                        filesToBeProcessed.add(s.getPath());
                    }
                    continue;
                }
                filesToBeProcessed.add(f.getPath());
            }
        } else {
            System.err.println("Given path " + srcPath + " does not exist. No data to copy.");
            isSuccess = false;
            return;
        }
        long ts = System.currentTimeMillis();
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        for (int i = 0; i < filesToBeProcessed.size(); ++i) {
            executor.execute(new ImporterThread(i, (Path)filesToBeProcessed.get(i), config));
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }

    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
        if (otherArgs.length < 2) {
            Import.Usage("Wrong number of arguments: " + otherArgs.length);
            System.exit(-1);
        }
        Import.ParseArgs(otherArgs);
        if (!mapReduce) {
            this.run_NonMR();
            return isSuccess ? 0 : 1;
        }
        Job job = Import.createSubmittableJob(this.getConf(), otherArgs);
        boolean isJobSuccessful = job.waitForCompletion(true);
        if (isJobSuccessful) {
            Import.doCleanup();
        }
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int ret = 0;
        try {
            ret = ToolRunner.run((Configuration)new Configuration(), (Tool)new Import(), (String[])args);
        }
        catch (Exception e) {
            ret = 1;
            e.printStackTrace();
        }
        System.exit(ret);
    }

    static {
        bulkLoad = true;
        mapReduce = true;
        numThreads = 16;
        isSuccess = false;
    }

    class ImporterThread
    extends BaseImporterThread {
        ImporterThread(int id, Path t, Configuration config) {
            super(id, t, config);
        }

        @Override
        public void run() {
            try {
                FileSystem fs = this.filePath.getFileSystem(this.config);
                this.config.setStrings("io.serializations", new String[]{this.config.get("io.serializations"), DBDocumentSerialization.class.getName()});
                SequenceFile.Reader reader = new SequenceFile.Reader(fs, this.filePath, this.config);
                BulkLoadRecordWriter writer = null;
                MapRDBTableImpl tab = new MapRDBTableImpl(new Path(dstPath), this.config);
                tab.setPrivateOption(MapRDBTableImpl.TablePrivateOption.PRESERVE_TIMESTAMP, true);
                if (bulkLoad) {
                    writer = new BulkLoadRecordWriter(Import.this.getConf(), new Path(dstPath));
                    this.importBulkload(reader, writer, tab);
                    writer.close(null);
                    isSuccess = true;
                    return;
                }
                ByteBufWritableComparable key = new ByteBufWritableComparable();
                ByteBufWritableComparable value = new ByteBufWritableComparable();
                int recordCount = 0;
                while (reader.next((Writable)key)) {
                    reader.getCurrentValue((Writable)value);
                    DBDocument docValue = SequenceFileRowColCodec.decode((ByteBuffer)value.getByteBuf(), (BiMap)tab.idPathMap());
                    tab.insertOrReplace(IdCodec.decode((ByteBuffer)key.getByteBuf()), (Document)docValue);
                    ++recordCount;
                }
                LOG.debug("recordCount " + Integer.toString(recordCount));
                tab.flush();
                tab.close();
                reader.close();
            }
            catch (Exception io) {
                LOG.error("Import encountered an exception: " + io.getMessage());
                io.printStackTrace();
                isSuccess = false;
                return;
            }
            isSuccess = true;
        }

        void importBulkload(SequenceFile.Reader reader, RecordWriter writer, MapRDBTableImpl tab) {
            ByteBufWritableComparable key = new ByteBufWritableComparable();
            ByteBufWritableComparable value = new ByteBufWritableComparable();
            try {
                while (reader.next((Writable)key)) {
                    reader.getCurrentValue((Writable)value);
                    DBDocument docValue = SequenceFileRowColCodec.decode((ByteBuffer)value.getByteBuf(), (BiMap)tab.idPathMap());
                    writer.write((Object)key, (Object)docValue);
                }
            }
            catch (Exception io) {
                LOG.error("Import encountered an exception in bulkload mode: " + io.getMessage());
                io.printStackTrace();
            }
        }
    }

    abstract class BaseImporterThread
    implements Runnable {
        protected Path filePath;
        protected int myid;
        protected Configuration config;

        protected BaseImporterThread(int id, Path filePath, Configuration config) {
            this.myid = id;
            this.filePath = filePath;
            this.config = config;
        }
    }
}

