/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.hcatalog.streaming;

import java.util.Arrays;
import java.util.Random;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.TransactionBatch;

public class StreamingIntegrationTester {
    private static final Log LOG = LogFactory.getLog((String)StreamingIntegrationTester.class.getName());
    private String db;
    private String table;
    private String uri;
    private int txnsPerBatch;
    private int writers;
    private int batches;
    private int recordsPerTxn;
    private int frequency;
    private float abortPct;
    private String[] partVals;
    private String[] cols;
    private String[] types;
    private boolean pause;

    public static void main(String[] args) {
        try {
            LogUtils.initHiveLog4j();
        }
        catch (LogUtils.LogInitializationException e) {
            System.err.println("Unable to initialize log4j " + StringUtils.stringifyException((Throwable)e));
            System.exit(-1);
        }
        Options options = new Options();
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"abort-pct");
        OptionBuilder.withDescription((String)"Percentage of transactions to abort, defaults to 5");
        OptionBuilder.withLongOpt((String)"abortpct");
        options.addOption(OptionBuilder.create((char)'a'));
        OptionBuilder.hasArgs();
        OptionBuilder.withArgName((String)"column-names");
        OptionBuilder.withDescription((String)"column names of table to write to");
        OptionBuilder.withLongOpt((String)"columns");
        OptionBuilder.withValueSeparator((char)',');
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((char)'c'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"database");
        OptionBuilder.withDescription((String)"Database of table to write to");
        OptionBuilder.withLongOpt((String)"database");
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((char)'d'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"frequency");
        OptionBuilder.withDescription((String)"How often to commit a transaction, in seconds, defaults to 1");
        OptionBuilder.withLongOpt((String)"frequency");
        options.addOption(OptionBuilder.create((char)'f'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"iterations");
        OptionBuilder.withDescription((String)"Number of batches to write, defaults to 10");
        OptionBuilder.withLongOpt((String)"num-batches");
        options.addOption(OptionBuilder.create((char)'i'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"metastore-uri");
        OptionBuilder.withDescription((String)"URI of Hive metastore");
        OptionBuilder.withLongOpt((String)"metastore-uri");
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((char)'m'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"num_transactions");
        OptionBuilder.withDescription((String)"Number of transactions per batch, defaults to 100");
        OptionBuilder.withLongOpt((String)"num-txns");
        options.addOption(OptionBuilder.create((char)'n'));
        OptionBuilder.hasArgs();
        OptionBuilder.withArgName((String)"partition-values");
        OptionBuilder.withDescription((String)"partition values, must be provided in order of partition columns, if not provided table is assumed to not be partitioned");
        OptionBuilder.withLongOpt((String)"partition");
        OptionBuilder.withValueSeparator((char)',');
        options.addOption(OptionBuilder.create((char)'p'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"records-per-transaction");
        OptionBuilder.withDescription((String)"records to write in each transaction, defaults to 100");
        OptionBuilder.withLongOpt((String)"records-per-txn");
        OptionBuilder.withValueSeparator((char)',');
        options.addOption(OptionBuilder.create((char)'r'));
        OptionBuilder.hasArgs();
        OptionBuilder.withArgName((String)"column-types");
        OptionBuilder.withDescription((String)"column types, valid values are string, int, float, decimal, date, datetime");
        OptionBuilder.withLongOpt((String)"schema");
        OptionBuilder.withValueSeparator((char)',');
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((char)'s'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"table");
        OptionBuilder.withDescription((String)"Table to write to");
        OptionBuilder.withLongOpt((String)"table");
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((char)'t'));
        OptionBuilder.hasArg();
        OptionBuilder.withArgName((String)"num-writers");
        OptionBuilder.withDescription((String)"Number of writers to create, defaults to 2");
        OptionBuilder.withLongOpt((String)"writers");
        options.addOption(OptionBuilder.create((char)'w'));
        OptionBuilder.hasArg((boolean)false);
        OptionBuilder.withArgName((String)"pause");
        OptionBuilder.withDescription((String)"Wait on keyboard input after commit & batch close. default: disabled");
        OptionBuilder.withLongOpt((String)"pause");
        options.addOption(OptionBuilder.create((char)'x'));
        GnuParser parser = new GnuParser();
        CommandLine cmdline = null;
        try {
            cmdline = parser.parse(options, args);
        }
        catch (ParseException e) {
            System.err.println(e.getMessage());
            StreamingIntegrationTester.usage(options);
        }
        boolean pause = cmdline.hasOption('x');
        String db = cmdline.getOptionValue('d');
        String table = cmdline.getOptionValue('t');
        String uri = cmdline.getOptionValue('m');
        int txnsPerBatch = Integer.valueOf(cmdline.getOptionValue('n', "100"));
        int writers = Integer.valueOf(cmdline.getOptionValue('w', "2"));
        int batches = Integer.valueOf(cmdline.getOptionValue('i', "10"));
        int recordsPerTxn = Integer.valueOf(cmdline.getOptionValue('r', "100"));
        int frequency = Integer.valueOf(cmdline.getOptionValue('f', "1"));
        int ap = Integer.valueOf(cmdline.getOptionValue('a', "5"));
        float abortPct = (float)ap / 100.0f;
        String[] partVals = cmdline.getOptionValues('p');
        String[] cols = cmdline.getOptionValues('c');
        String[] types = cmdline.getOptionValues('s');
        StreamingIntegrationTester sit = new StreamingIntegrationTester(db, table, uri, txnsPerBatch, writers, batches, recordsPerTxn, frequency, abortPct, partVals, cols, types, pause);
        sit.go();
    }

    static void usage(Options options) {
        HelpFormatter hf = new HelpFormatter();
        hf.printHelp(74, "sit [options]", "Usage: ", options, "");
        System.exit(-1);
    }

    private StreamingIntegrationTester(String db, String table, String uri, int txnsPerBatch, int writers, int batches, int recordsPerTxn, int frequency, float abortPct, String[] partVals, String[] cols, String[] types, boolean pause) {
        this.db = db;
        this.table = table;
        this.uri = uri;
        this.txnsPerBatch = txnsPerBatch;
        this.writers = writers;
        this.batches = batches;
        this.recordsPerTxn = recordsPerTxn;
        this.frequency = frequency;
        this.abortPct = abortPct;
        this.partVals = partVals;
        this.cols = cols;
        this.types = types;
        this.pause = pause;
    }

    private void go() {
        HiveEndPoint endPoint = null;
        try {
            endPoint = this.partVals == null ? new HiveEndPoint(this.uri, this.db, this.table, null) : new HiveEndPoint(this.uri, this.db, this.table, Arrays.asList(this.partVals));
            for (int i = 0; i < this.writers; ++i) {
                Writer w = new Writer(endPoint, i, this.txnsPerBatch, this.batches, this.recordsPerTxn, this.frequency, this.abortPct, this.cols, this.types, this.pause);
                w.start();
            }
        }
        catch (Throwable t) {
            System.err.println("Caught exception while testing: " + StringUtils.stringifyException((Throwable)t));
        }
    }

    private static class Writer
    extends Thread {
        private HiveEndPoint endPoint;
        private int txnsPerBatch;
        private int batches;
        private int writerNumber;
        private int recordsPerTxn;
        private int frequency;
        private float abortPct;
        private String[] cols;
        private String[] types;
        private boolean pause;
        private Random rand;

        Writer(HiveEndPoint endPoint, int writerNumber, int txnsPerBatch, int batches, int recordsPerTxn, int frequency, float abortPct, String[] cols, String[] types, boolean pause) {
            this.endPoint = endPoint;
            this.txnsPerBatch = txnsPerBatch;
            this.batches = batches;
            this.writerNumber = writerNumber;
            this.recordsPerTxn = recordsPerTxn;
            this.frequency = frequency * 1000;
            this.abortPct = abortPct;
            this.cols = cols;
            this.types = types;
            this.pause = pause;
            this.rand = new Random();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try (StreamingConnection conn = null;){
                conn = this.endPoint.newConnection(true);
                DelimitedInputWriter writer = new DelimitedInputWriter(this.cols, ",", this.endPoint);
                for (int i = 0; i < this.batches; ++i) {
                    long start = System.currentTimeMillis();
                    LOG.info((Object)("Starting batch " + i));
                    TransactionBatch batch = conn.fetchTransactionBatch(this.txnsPerBatch, (RecordWriter)writer);
                    try {
                        while (batch.remainingTransactions() > 0) {
                            batch.beginNextTransaction();
                            for (int j = 0; j < this.recordsPerTxn; ++j) {
                                batch.write(this.generateRecord(this.cols, this.types));
                            }
                            if (this.rand.nextFloat() < this.abortPct) {
                                batch.abort();
                            } else {
                                batch.commit();
                            }
                            if (!this.pause) continue;
                            System.out.println("Writer " + this.writerNumber + " committed... press Enter to continue. " + Thread.currentThread().getId());
                            System.in.read();
                        }
                        long end = System.currentTimeMillis();
                        if (end - start >= (long)this.frequency) continue;
                        Thread.sleep((long)this.frequency - (end - start));
                        continue;
                    }
                    finally {
                        batch.close();
                        if (this.pause) {
                            System.out.println("Writer " + this.writerNumber + " has closed a Batch.. press Enter to continue. " + Thread.currentThread().getId());
                            System.in.read();
                        }
                    }
                }
            }
        }

        private byte[] generateRecord(String[] cols, String[] types) {
            StringBuilder buf = new StringBuilder();
            for (int i = 0; i < types.length; ++i) {
                buf.append(this.generateColumn(types[i]));
                buf.append(",");
            }
            return buf.toString().getBytes();
        }

        private String generateColumn(String type) {
            if ("string".equals(type.toLowerCase())) {
                return "When that Aprilis with his showers swoot";
            }
            if (type.toLowerCase().startsWith("int")) {
                return "42";
            }
            if (type.toLowerCase().startsWith("dec") || type.toLowerCase().equals("float")) {
                return "3.141592654";
            }
            if (type.toLowerCase().equals("datetime")) {
                return "2014-03-07 15:33:22";
            }
            if (type.toLowerCase().equals("date")) {
                return "1955-11-12";
            }
            throw new RuntimeException("Sorry, I don't know the type " + type);
        }
    }
}

