/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mapreduce.replication;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnectable;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class VerifyReplication
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(VerifyReplication.class);
    public static final String NAME = "verifyrep";
    static long startTime = 0L;
    static long endTime = Long.MAX_VALUE;
    static int versions = -1;
    static String tableName = null;
    static String families = null;
    static String peerId = null;

    private static String getPeerQuorumAddress(Configuration conf) throws IOException {
        ZooKeeperWatcher localZKW = null;
        ReplicationPeerZKImpl peer = null;
        try {
            localZKW = new ZooKeeperWatcher(conf, "VerifyReplication", new Abortable(){

                @Override
                public void abort(String why, Throwable e) {
                }

                @Override
                public boolean isAborted() {
                    return false;
                }
            });
            ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
            rp.init();
            Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
            if (pair == null) {
                throw new IOException("Couldn't get peer conf!");
            }
            Configuration peerConf = rp.getPeerConf(peerId).getSecond();
            String string = ZKUtil.getZooKeeperClusterKey(peerConf);
            return string;
        }
        catch (ReplicationException e) {
            throw new IOException("An error occured while trying to connect to the remove peer cluster", e);
        }
        finally {
            if (peer != null) {
                peer.close();
            }
            if (localZKW != null) {
                localZKW.close();
            }
        }
    }

    public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
        if (!VerifyReplication.doCommandLine(args)) {
            return null;
        }
        if (!conf.getBoolean("hbase.replication", true)) {
            throw new IOException("Replication needs to be enabled to verify it.");
        }
        conf.set("verifyrep.peerId", peerId);
        conf.set("verifyrep.tableName", tableName);
        conf.setLong("verifyrep.startTime", startTime);
        conf.setLong("verifyrep.endTime", endTime);
        if (families != null) {
            conf.set("verifyrep.families", families);
        }
        String peerQuorumAddress = VerifyReplication.getPeerQuorumAddress(conf);
        conf.set("verifyrep.peerQuorumAddress", peerQuorumAddress);
        LOG.info((Object)("Peer Quorum Address: " + peerQuorumAddress));
        Job job = new Job(conf, "verifyrep_" + tableName);
        job.setJarByClass(VerifyReplication.class);
        Scan scan = new Scan();
        scan.setTimeRange(startTime, endTime);
        if (versions >= 0) {
            scan.setMaxVersions(versions);
        }
        if (families != null) {
            String[] fams;
            for (String fam : fams = families.split(",")) {
                scan.addFamily(Bytes.toBytes(fam));
            }
        }
        TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
        TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setNumReduceTasks(0);
        return job;
    }

    private static boolean doCommandLine(String[] args) {
        if (args.length < 2) {
            VerifyReplication.printUsage(null);
            return false;
        }
        try {
            for (int i = 0; i < args.length; ++i) {
                String cmd = args[i];
                if (cmd.equals("-h") || cmd.startsWith("--h")) {
                    VerifyReplication.printUsage(null);
                    return false;
                }
                String startTimeArgKey = "--starttime=";
                if (cmd.startsWith("--starttime=")) {
                    startTime = Long.parseLong(cmd.substring("--starttime=".length()));
                    continue;
                }
                String endTimeArgKey = "--endtime=";
                if (cmd.startsWith("--endtime=")) {
                    endTime = Long.parseLong(cmd.substring("--endtime=".length()));
                    continue;
                }
                String versionsArgKey = "--versions=";
                if (cmd.startsWith("--versions=")) {
                    versions = Integer.parseInt(cmd.substring("--versions=".length()));
                    continue;
                }
                String familiesArgKey = "--families=";
                if (cmd.startsWith("--families=")) {
                    families = cmd.substring("--families=".length());
                    continue;
                }
                if (i == args.length - 2) {
                    peerId = cmd;
                }
                if (i != args.length - 1) continue;
                tableName = cmd;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            VerifyReplication.printUsage("Can't start because " + e.getMessage());
            return false;
        }
        return true;
    }

    private static void printUsage(String errorMsg) {
        if (errorMsg != null && errorMsg.length() > 0) {
            System.err.println("ERROR: " + errorMsg);
        }
        System.err.println("Usage: verifyrep [--starttime=X] [--stoptime=Y] [--families=A] <peerid> <tablename>");
        System.err.println();
        System.err.println("Options:");
        System.err.println(" starttime    beginning of the time range");
        System.err.println("              without endtime means from starttime to forever");
        System.err.println(" endtime      end of the time range");
        System.err.println(" versions     number of cell versions to verify");
        System.err.println(" families     comma-separated list of families to copy");
        System.err.println();
        System.err.println("Args:");
        System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
        System.err.println(" tablename    Name of the table to verify");
        System.err.println();
        System.err.println("Examples:");
        System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
        System.err.println(" $ bin/hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        Job job = VerifyReplication.createSubmittableJob(conf, args);
        if (job != null) {
            return job.waitForCompletion(true) ? 0 : 1;
        }
        return 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)HBaseConfiguration.create(), (Tool)new VerifyReplication(), (String[])args);
        System.exit(res);
    }

    public static class Verifier
    extends TableMapper<ImmutableBytesWritable, Put> {
        private ResultScanner replicatedScanner;
        private Result currentCompareRowInPeerTable;
        private Table replicatedTable;

        public void map(ImmutableBytesWritable row, final Result value, Mapper.Context context) throws IOException {
            if (this.replicatedScanner == null) {
                Configuration conf = context.getConfiguration();
                final Scan scan = new Scan();
                scan.setCaching(conf.getInt("hbase.mapreduce.scan.cachedrows", 1));
                long startTime = conf.getLong("verifyrep.startTime", 0L);
                long endTime = conf.getLong("verifyrep.endTime", Long.MAX_VALUE);
                String families = conf.get("verifyrep.families", null);
                if (families != null) {
                    String[] fams;
                    for (String fam : fams = families.split(",")) {
                        scan.addFamily(Bytes.toBytes(fam));
                    }
                }
                scan.setTimeRange(startTime, endTime);
                if (versions >= 0) {
                    scan.setMaxVersions(versions);
                }
                final TableSplit tableSplit = (TableSplit)context.getInputSplit();
                HConnectionManager.execute(new HConnectable<Void>(conf){

                    @Override
                    public Void connect(HConnection conn) throws IOException {
                        String zkClusterKey = this.conf.get("verifyrep.peerQuorumAddress");
                        Configuration peerConf = HBaseConfiguration.create(this.conf);
                        ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
                        TableName tableName = TableName.valueOf(this.conf.get("verifyrep.tableName"));
                        replicatedTable = new HTable(peerConf, tableName);
                        scan.setStartRow(value.getRow());
                        scan.setStopRow(tableSplit.getEndRow());
                        replicatedScanner = replicatedTable.getScanner(scan);
                        return null;
                    }
                });
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
            while (true) {
                if (this.currentCompareRowInPeerTable == null) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
                    break;
                }
                int rowCmpRet = Bytes.compareTo(value.getRow(), this.currentCompareRowInPeerTable.getRow());
                if (rowCmpRet == 0) {
                    try {
                        Result.compareResults(value, this.currentCompareRowInPeerTable);
                        context.getCounter((Enum)Counters.GOODROWS).increment(1L);
                    }
                    catch (Exception e) {
                        this.logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
                        LOG.error((Object)("Exception while comparing row : " + e));
                    }
                    this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    break;
                }
                if (rowCmpRet < 0) {
                    this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
                    break;
                }
                this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, this.currentCompareRowInPeerTable);
                this.currentCompareRowInPeerTable = this.replicatedScanner.next();
            }
        }

        private void logFailRowAndIncreaseCounter(Mapper.Context context, Counters counter, Result row) {
            context.getCounter((Enum)counter).increment(1L);
            context.getCounter((Enum)Counters.BADROWS).increment(1L);
            LOG.error((Object)(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow())));
        }

        protected void cleanup(Mapper.Context context) {
            if (this.replicatedScanner != null) {
                try {
                    while (this.currentCompareRowInPeerTable != null) {
                        this.logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, this.currentCompareRowInPeerTable);
                        this.currentCompareRowInPeerTable = this.replicatedScanner.next();
                    }
                }
                catch (Exception e) {
                    LOG.error((Object)"fail to scan peer table in cleanup", (Throwable)e);
                }
                finally {
                    this.replicatedScanner.close();
                    this.replicatedScanner = null;
                }
            }
            if (this.replicatedTable != null) {
                TableName tableName = this.replicatedTable.getName();
                try {
                    this.replicatedTable.close();
                }
                catch (IOException ioe) {
                    LOG.warn((Object)("Exception closing " + tableName), (Throwable)ioe);
                }
            }
        }

        public static enum Counters {
            GOODROWS,
            BADROWS,
            ONLY_IN_SOURCE_TABLE_ROWS,
            ONLY_IN_PEER_TABLE_ROWS,
            CONTENT_DIFFERENT_ROWS;

        }
    }
}

