/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import drill.shaded.hbase.guava.com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={LargeTests.class})
public class TestHRegionServerBulkLoad {
    private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
    private static final Configuration conf = UTIL.getConfiguration();
    private static final byte[] QUAL = Bytes.toBytes("qual");
    private static final int NUM_CFS = 10;
    private int sleepDuration;
    public static int BLOCKSIZE = 65536;
    public static Compression.Algorithm COMPRESSION = Compression.Algorithm.NONE;
    private static final byte[][] families = new byte[10][];

    @Parameterized.Parameters
    public static final Collection<Object[]> parameters() {
        int[] sleepDurations = new int[]{0, 30000};
        ArrayList<Object[]> configurations = new ArrayList<Object[]>();
        for (int i : sleepDurations) {
            configurations.add(new Object[]{i});
        }
        return configurations;
    }

    public TestHRegionServerBulkLoad(int duration) {
        this.sleepDuration = duration;
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf.setInt("hbase.rpc.timeout", 10000);
    }

    public static byte[] rowkey(int i) {
        return Bytes.toBytes(String.format("row_%08d", i));
    }

    static String family(int i) {
        return String.format("family_%04d", i);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void createHFile(FileSystem fs, Path path, byte[] family, byte[] qualifier, byte[] value, int numRows) throws IOException {
        HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(COMPRESSION).build();
        long now = System.currentTimeMillis();
        try (HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path).withFileContext(context).create();){
            for (int i = 0; i < numRows; ++i) {
                KeyValue kv = new KeyValue(TestHRegionServerBulkLoad.rowkey(i), family, qualifier, now, value);
                writer.append(kv);
            }
            writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
        }
    }

    private void setupTable(TableName table, int cfs) throws IOException {
        try {
            LOG.info((Object)("Creating table " + table));
            HTableDescriptor htd = new HTableDescriptor(table);
            htd.addCoprocessor(MyObserver.class.getName());
            MyObserver.sleepDuration = this.sleepDuration;
            for (int i = 0; i < 10; ++i) {
                htd.addFamily(new HColumnDescriptor(TestHRegionServerBulkLoad.family(i)));
            }
            UTIL.getHBaseAdmin().createTable(htd);
        }
        catch (TableExistsException tee) {
            LOG.info((Object)("Table " + table + " already exists"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAtomicBulkLoad() throws Exception {
        TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
        int millisToRun = 30000;
        int numScanners = 50;
        UTIL.startMiniCluster(1, false, true);
        try {
            WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
            FindBulkHBaseListener listener = new FindBulkHBaseListener();
            log.registerWALActionsListener(listener);
            this.runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
            Assert.assertThat((Object)listener.isFound(), (Matcher)Is.is((Object)true));
        }
        finally {
            UTIL.shutdownMiniCluster();
        }
    }

    void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) throws Exception {
        this.setupTable(tableName, 10);
        MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(UTIL.getConfiguration());
        AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, (byte[][])null);
        ctx.addThread(loader);
        ArrayList<AtomicScanReader> scanners = Lists.newArrayList();
        for (int i = 0; i < numScanners; ++i) {
            AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
            scanners.add(scanner);
            ctx.addThread(scanner);
        }
        ctx.startThreads();
        ctx.waitFor(millisToRun);
        ctx.stop();
        LOG.info((Object)"Loaders:");
        LOG.info((Object)("  loaded " + loader.numBulkLoads.get()));
        LOG.info((Object)("  compations " + loader.numCompactions.get()));
        LOG.info((Object)"Scanners:");
        for (AtomicScanReader scanner : scanners) {
            LOG.info((Object)("  scanned " + scanner.numScans.get()));
            LOG.info((Object)("  verified " + scanner.numRowsScanned.get() + " rows"));
        }
    }

    public static void main(String[] args) throws Exception {
        try {
            Configuration c = HBaseConfiguration.create();
            TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
            test.setConf(c);
            test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 300000, 50);
        }
        finally {
            System.exit(0);
        }
    }

    private void setConf(Configuration c) {
        UTIL = new HBaseTestingUtility(c);
    }

    static {
        for (int i = 0; i < 10; ++i) {
            TestHRegionServerBulkLoad.families[i] = Bytes.toBytes(TestHRegionServerBulkLoad.family(i));
        }
    }

    static class FindBulkHBaseListener
    extends TestWALActionsListener.DummyWALActionsListener {
        private boolean found = false;

        FindBulkHBaseListener() {
        }

        @Override
        public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
            for (Cell cell : logEdit.getCells()) {
                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                for (Map.Entry<String, Object> entry : kv.toStringMap().entrySet()) {
                    if (!entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) continue;
                    this.found = true;
                }
            }
        }

        public boolean isFound() {
            return this.found;
        }
    }

    public static class AtomicScanReader
    extends MultithreadedTestUtil.RepeatingTestThread {
        byte[][] targetFamilies;
        HTable table;
        AtomicLong numScans = new AtomicLong();
        AtomicLong numRowsScanned = new AtomicLong();
        TableName TABLE_NAME;

        public AtomicScanReader(TableName TABLE_NAME, MultithreadedTestUtil.TestContext ctx, byte[][] targetFamilies) throws IOException {
            super(ctx);
            this.TABLE_NAME = TABLE_NAME;
            this.targetFamilies = targetFamilies;
            this.table = new HTable(conf, TABLE_NAME);
        }

        @Override
        public void doAnAction() throws Exception {
            Scan s = new Scan();
            for (byte[] family : this.targetFamilies) {
                s.addFamily(family);
            }
            ResultScanner scanner = this.table.getScanner(s);
            for (Result res : scanner) {
                byte[] lastRow = null;
                byte[] lastFam = null;
                byte[] lastQual = null;
                byte[] gotValue = null;
                for (byte[] family : this.targetFamilies) {
                    byte[] qualifier = QUAL;
                    byte[] thisValue = res.getValue(family, qualifier);
                    if (gotValue != null && thisValue != null && !Bytes.equals(gotValue, thisValue)) {
                        StringBuilder msg = new StringBuilder();
                        msg.append("Failed on scan ").append(this.numScans).append(" after scanning ").append(this.numRowsScanned).append(" rows!\n");
                        msg.append("Current  was " + Bytes.toString(res.getRow()) + "/" + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + " = " + Bytes.toString(thisValue) + "\n");
                        msg.append("Previous  was " + Bytes.toString(lastRow) + "/" + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) + " = " + Bytes.toString(gotValue));
                        throw new RuntimeException(msg.toString());
                    }
                    lastFam = family;
                    lastQual = qualifier;
                    lastRow = res.getRow();
                    gotValue = thisValue;
                }
                this.numRowsScanned.getAndIncrement();
            }
            this.numScans.getAndIncrement();
        }
    }

    public static class MyObserver
    extends BaseRegionObserver {
        static int sleepDuration;

        @Override
        public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, InternalScanner scanner, ScanType scanType) throws IOException {
            try {
                Thread.sleep(sleepDuration);
            }
            catch (InterruptedException ie) {
                InterruptedIOException ioe = new InterruptedIOException();
                ioe.initCause(ie);
                throw ioe;
            }
            return scanner;
        }
    }

    public static class AtomicHFileLoader
    extends MultithreadedTestUtil.RepeatingTestThread {
        final AtomicLong numBulkLoads = new AtomicLong();
        final AtomicLong numCompactions = new AtomicLong();
        private TableName tableName;

        public AtomicHFileLoader(TableName tableName, MultithreadedTestUtil.TestContext ctx, byte[][] targetFamilies) throws IOException {
            super(ctx);
            this.tableName = tableName;
        }

        @Override
        public void doAnAction() throws Exception {
            long iteration = this.numBulkLoads.getAndIncrement();
            Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration));
            FileSystem fs = UTIL.getTestFileSystem();
            byte[] val = Bytes.toBytes(String.format("%010d", iteration));
            final ArrayList<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(10);
            for (int i = 0; i < 10; ++i) {
                Path hfile = new Path(dir, TestHRegionServerBulkLoad.family(i));
                byte[] fam = Bytes.toBytes(TestHRegionServerBulkLoad.family(i));
                TestHRegionServerBulkLoad.createHFile(fs, hfile, fam, QUAL, val, 1000);
                famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
            }
            final HConnection conn = UTIL.getHBaseAdmin().getConnection();
            RegionServerCallable<Void> callable = new RegionServerCallable<Void>((Connection)conn, this.tableName, Bytes.toBytes("aaa")){

                @Override
                public Void call(int callTimeout) throws Exception {
                    LOG.debug((Object)("Going to connect to server " + this.getLocation() + " for row " + Bytes.toStringBinary(this.getRow())));
                    byte[] regionName = this.getLocation().getRegionInfo().getRegionName();
                    ClientProtos.BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
                    this.getStub().bulkLoadHFile(null, request);
                    return null;
                }
            };
            RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
            RpcRetryingCaller<Void> caller = factory.newCaller();
            caller.callWithRetries(callable, Integer.MAX_VALUE);
            if (this.numBulkLoads.get() % 5L == 0L) {
                callable = new RegionServerCallable<Void>((Connection)conn, this.tableName, Bytes.toBytes("aaa")){

                    @Override
                    public Void call(int callTimeout) throws Exception {
                        LOG.debug((Object)("compacting " + this.getLocation() + " for row " + Bytes.toStringBinary(this.getRow())));
                        AdminProtos.AdminService.BlockingInterface server = conn.getAdmin(this.getLocation().getServerName());
                        AdminProtos.CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(this.getLocation().getRegionInfo().getRegionName(), true, null);
                        server.compactRegion(null, request);
                        numCompactions.incrementAndGet();
                        return null;
                    }
                };
                caller.callWithRetries(callable, Integer.MAX_VALUE);
            }
        }
    }
}

