/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestHRegionServerBulkLoad {
    static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
    private static final Configuration conf = UTIL.getConfiguration();
    private static final byte[] QUAL = Bytes.toBytes((String)"qual");
    private static final int NUM_CFS = 10;
    public static int BLOCKSIZE = 65536;
    public static Compression.Algorithm COMPRESSION = Compression.Algorithm.NONE;
    private static final byte[][] families = new byte[10][];

    static byte[] rowkey(int i) {
        return Bytes.toBytes((String)String.format("row_%08d", i));
    }

    static String family(int i) {
        return String.format("family_%04d", i);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, byte[] value, int numRows) throws IOException {
        HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
        HFile.Writer writer = HFile.getWriterFactory((Configuration)conf, (CacheConfig)new CacheConfig(conf)).withPath(fs, path).withFileContext(context).create();
        long now = System.currentTimeMillis();
        try {
            for (int i = 0; i < numRows; ++i) {
                KeyValue kv = new KeyValue(TestHRegionServerBulkLoad.rowkey(i), family, qualifier, now, value);
                writer.append(kv);
            }
        }
        finally {
            writer.close();
        }
    }

    private void setupTable(String table, int cfs) throws IOException {
        try {
            LOG.info((Object)("Creating table " + table));
            HTableDescriptor htd = new HTableDescriptor(TableName.valueOf((String)table));
            for (int i = 0; i < 10; ++i) {
                htd.addFamily(new HColumnDescriptor(TestHRegionServerBulkLoad.family(i)));
            }
            UTIL.getHBaseAdmin().createTable(htd);
        }
        catch (TableExistsException tee) {
            LOG.info((Object)("Table " + table + " already exists"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAtomicBulkLoad() throws Exception {
        String TABLE_NAME = "atomicBulkLoad";
        int millisToRun = 30000;
        int numScanners = 50;
        UTIL.startMiniCluster(1);
        try {
            this.runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
        }
        finally {
            UTIL.shutdownMiniCluster();
        }
    }

    void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners) throws Exception {
        this.setupTable(tableName, 10);
        MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(UTIL.getConfiguration());
        AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, (byte[][])null);
        ctx.addThread(loader);
        ArrayList scanners = Lists.newArrayList();
        for (int i = 0; i < numScanners; ++i) {
            AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
            scanners.add(scanner);
            ctx.addThread(scanner);
        }
        ctx.startThreads();
        ctx.waitFor(millisToRun);
        ctx.stop();
        LOG.info((Object)"Loaders:");
        LOG.info((Object)("  loaded " + loader.numBulkLoads.get()));
        LOG.info((Object)("  compations " + loader.numCompactions.get()));
        LOG.info((Object)"Scanners:");
        for (AtomicScanReader scanner : scanners) {
            LOG.info((Object)("  scanned " + scanner.numScans.get()));
            LOG.info((Object)("  verified " + scanner.numRowsScanned.get() + " rows"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        try {
            Configuration c = HBaseConfiguration.create();
            TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
            test.setConf(c);
            test.runAtomicBulkloadTest("atomicTableTest", 300000, 50);
        }
        finally {
            System.exit(0);
        }
    }

    private void setConf(Configuration c) {
        UTIL = new HBaseTestingUtility(c);
    }

    static {
        for (int i = 0; i < 10; ++i) {
            TestHRegionServerBulkLoad.families[i] = Bytes.toBytes((String)TestHRegionServerBulkLoad.family(i));
        }
    }

    public static class AtomicScanReader
    extends MultithreadedTestUtil.RepeatingTestThread {
        byte[][] targetFamilies;
        HTable table;
        AtomicLong numScans = new AtomicLong();
        AtomicLong numRowsScanned = new AtomicLong();
        String TABLE_NAME;

        public AtomicScanReader(String TABLE_NAME, MultithreadedTestUtil.TestContext ctx, byte[][] targetFamilies) throws IOException {
            super(ctx);
            this.TABLE_NAME = TABLE_NAME;
            this.targetFamilies = targetFamilies;
            this.table = new HTable(conf, TABLE_NAME);
        }

        @Override
        public void doAnAction() throws Exception {
            Scan s = new Scan();
            for (byte[] family : this.targetFamilies) {
                s.addFamily(family);
            }
            ResultScanner scanner = this.table.getScanner(s);
            for (Result res : scanner) {
                byte[] lastRow = null;
                byte[] lastFam = null;
                byte[] lastQual = null;
                byte[] gotValue = null;
                for (byte[] family : this.targetFamilies) {
                    byte[] qualifier = QUAL;
                    byte[] thisValue = res.getValue(family, qualifier);
                    if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, (byte[])thisValue)) {
                        StringBuilder msg = new StringBuilder();
                        msg.append("Failed on scan ").append(this.numScans).append(" after scanning ").append(this.numRowsScanned).append(" rows!\n");
                        msg.append("Current  was " + Bytes.toString((byte[])res.getRow()) + "/" + Bytes.toString((byte[])family) + ":" + Bytes.toString((byte[])qualifier) + " = " + Bytes.toString((byte[])thisValue) + "\n");
                        msg.append("Previous  was " + Bytes.toString((byte[])lastRow) + "/" + Bytes.toString((byte[])lastFam) + ":" + Bytes.toString((byte[])lastQual) + " = " + Bytes.toString((byte[])gotValue));
                        throw new RuntimeException(msg.toString());
                    }
                    lastFam = family;
                    lastQual = qualifier;
                    lastRow = res.getRow();
                    gotValue = thisValue;
                }
                this.numRowsScanned.getAndIncrement();
            }
            this.numScans.getAndIncrement();
        }
    }

    public static class AtomicHFileLoader
    extends MultithreadedTestUtil.RepeatingTestThread {
        final AtomicLong numBulkLoads = new AtomicLong();
        final AtomicLong numCompactions = new AtomicLong();
        private String tableName;

        public AtomicHFileLoader(String tableName, MultithreadedTestUtil.TestContext ctx, byte[][] targetFamilies) throws IOException {
            super(ctx);
            this.tableName = tableName;
        }

        @Override
        public void doAnAction() throws Exception {
            long iteration = this.numBulkLoads.getAndIncrement();
            Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
            FileSystem fs = UTIL.getTestFileSystem();
            byte[] val = Bytes.toBytes((String)String.format("%010d", iteration));
            final ArrayList<Pair> famPaths = new ArrayList<Pair>(10);
            for (int i = 0; i < 10; ++i) {
                Path hfile = new Path(dir, TestHRegionServerBulkLoad.family(i));
                byte[] fam = Bytes.toBytes((String)TestHRegionServerBulkLoad.family(i));
                TestHRegionServerBulkLoad.createHFile(fs, hfile, fam, QUAL, val, 1000);
                famPaths.add(new Pair((Object)fam, (Object)hfile.toString()));
            }
            final HConnection conn = UTIL.getHBaseAdmin().getConnection();
            TableName tbl = TableName.valueOf((String)this.tableName);
            RegionServerCallable callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes((String)"aaa")){

                public Void call() throws Exception {
                    LOG.debug((Object)("Going to connect to server " + this.getLocation() + " for row " + Bytes.toStringBinary((byte[])this.getRow())));
                    byte[] regionName = this.getLocation().getRegionInfo().getRegionName();
                    ClientProtos.BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest((List)famPaths, (byte[])regionName, (boolean)true);
                    this.getStub().bulkLoadHFile(null, request);
                    return null;
                }
            };
            RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
            RpcRetryingCaller caller = factory.newCaller();
            caller.callWithRetries((RetryingCallable)callable);
            if (this.numBulkLoads.get() % 10L == 0L) {
                callable = new RegionServerCallable<Void>(conn, tbl, Bytes.toBytes((String)"aaa")){

                    public Void call() throws Exception {
                        LOG.debug((Object)("compacting " + this.getLocation() + " for row " + Bytes.toStringBinary((byte[])this.getRow())));
                        AdminProtos.AdminService.BlockingInterface server = conn.getAdmin(this.getLocation().getServerName());
                        AdminProtos.CompactRegionRequest request = RequestConverter.buildCompactRegionRequest((byte[])this.getLocation().getRegionInfo().getRegionName(), (boolean)true, null);
                        server.compactRegion(null, request);
                        AtomicHFileLoader.this.numCompactions.incrementAndGet();
                        return null;
                    }
                };
                caller.callWithRetries((RetryingCallable)callable);
            }
        }
    }
}

