/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.throttle;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={RegionServerTests.class, MediumTests.class})
public class TestCompactionWithThroughputController {
    private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final double EPSILON = 1.0E-6;
    private final TableName tableName = TableName.valueOf(this.getClass().getSimpleName());
    private final byte[] family = Bytes.toBytes("f");
    private final byte[] qualifier = Bytes.toBytes("q");

    private Store getStoreWithName(TableName tableName) {
        MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
        List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
        for (int i = 0; i < cluster.getRegionServerThreads().size(); ++i) {
            HRegionServer hrs = rsts.get(i).getRegionServer();
            Iterator<Region> iterator = hrs.getOnlineRegions(tableName).iterator();
            if (!iterator.hasNext()) continue;
            Region region = iterator.next();
            return region.getStores().iterator().next();
        }
        return null;
    }

    private Store prepareData() throws IOException {
        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
        if (admin.tableExists(this.tableName)) {
            admin.disableTable(this.tableName);
            admin.deleteTable(this.tableName);
        }
        HTable table = TEST_UTIL.createTable(this.tableName, this.family);
        Random rand = new Random();
        for (int i = 0; i < 10; ++i) {
            for (int j = 0; j < 10; ++j) {
                byte[] value = new byte[131072];
                rand.nextBytes(value);
                table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(this.family, this.qualifier, value));
            }
            admin.flush(this.tableName);
        }
        return this.getStoreWithName(this.tableName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long testCompactionWithThroughputLimit() throws Exception {
        long throughputLimit = 0x100000L;
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.set("hbase.hstore.engine.class", DefaultStoreEngine.class.getName());
        conf.setInt("hbase.hstore.compaction.min", 100);
        conf.setInt("hbase.hstore.compaction.max", 200);
        conf.setInt("hbase.hstore.blockingStoreFiles", 10000);
        conf.setLong("hbase.hstore.compaction.throughput.higher.bound", throughputLimit);
        conf.setLong("hbase.hstore.compaction.throughput.lower.bound", throughputLimit);
        conf.set("hbase.regionserver.throughput.controller", PressureAwareCompactionThroughputController.class.getName());
        TEST_UTIL.startMiniCluster(1);
        try {
            Store store = this.prepareData();
            Assert.assertEquals((long)10L, (long)store.getStorefilesCount());
            long startTime = System.currentTimeMillis();
            TEST_UTIL.getHBaseAdmin().majorCompact(this.tableName);
            while (store.getStorefilesCount() != 1) {
                Thread.sleep(20L);
            }
            long duration = System.currentTimeMillis() - startTime;
            double throughput = (double)store.getStorefilesSize() / (double)duration * 1000.0;
            Assert.assertTrue((throughput < (double)throughputLimit * 1.2 ? 1 : 0) != 0);
            Assert.assertTrue((throughput > (double)throughputLimit * 0.8 ? 1 : 0) != 0);
            long l = System.currentTimeMillis() - startTime;
            return l;
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long testCompactionWithoutThroughputLimit() throws Exception {
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.set("hbase.hstore.engine.class", DefaultStoreEngine.class.getName());
        conf.setInt("hbase.hstore.compaction.min", 100);
        conf.setInt("hbase.hstore.compaction.max", 200);
        conf.setInt("hbase.hstore.blockingStoreFiles", 10000);
        conf.set("hbase.regionserver.throughput.controller", NoLimitThroughputController.class.getName());
        TEST_UTIL.startMiniCluster(1);
        try {
            Store store = this.prepareData();
            Assert.assertEquals((long)10L, (long)store.getStorefilesCount());
            long startTime = System.currentTimeMillis();
            TEST_UTIL.getHBaseAdmin().majorCompact(this.tableName);
            while (store.getStorefilesCount() != 1) {
                Thread.sleep(20L);
            }
            long l = System.currentTimeMillis() - startTime;
            return l;
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
    }

    @Test
    public void testCompaction() throws Exception {
        long limitTime = this.testCompactionWithThroughputLimit();
        long noLimitTime = this.testCompactionWithoutThroughputLimit();
        LOG.info((Object)("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " + noLimitTime + "ms"));
        Assert.assertTrue((limitTime > noLimitTime * 2L ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThroughputTuning() throws Exception {
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.set("hbase.hstore.engine.class", DefaultStoreEngine.class.getName());
        conf.setLong("hbase.hstore.compaction.throughput.higher.bound", 0x1400000L);
        conf.setLong("hbase.hstore.compaction.throughput.lower.bound", 0xA00000L);
        conf.setInt("hbase.hstore.compaction.min", 4);
        conf.setInt("hbase.hstore.blockingStoreFiles", 6);
        conf.set("hbase.regionserver.throughput.controller", PressureAwareCompactionThroughputController.class.getName());
        conf.setInt("hbase.hstore.compaction.throughput.tune.period", 1000);
        TEST_UTIL.startMiniCluster(1);
        Connection conn = ConnectionFactory.createConnection(conf);
        try {
            byte[] value;
            HTableDescriptor htd = new HTableDescriptor(this.tableName);
            htd.addFamily(new HColumnDescriptor(this.family));
            htd.setCompactionEnabled(false);
            TEST_UTIL.getHBaseAdmin().createTable(htd);
            TEST_UTIL.waitTableAvailable(this.tableName);
            HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(this.tableName);
            PressureAwareCompactionThroughputController throughputController = (PressureAwareCompactionThroughputController)regionServer.compactSplitThread.getCompactionThroughputController();
            Assert.assertEquals((double)1.048576E7, (double)throughputController.getMaxThroughput(), (double)1.0E-6);
            Table table = conn.getTable(this.tableName);
            for (int i = 0; i < 5; ++i) {
                value = new byte[]{};
                table.put(new Put(Bytes.toBytes(i)).addColumn(this.family, this.qualifier, value));
                TEST_UTIL.flush(this.tableName);
            }
            Thread.sleep(2000L);
            Assert.assertEquals((double)1.572864E7, (double)throughputController.getMaxThroughput(), (double)1.0E-6);
            byte[] value1 = new byte[]{};
            table.put(new Put(Bytes.toBytes(5)).addColumn(this.family, this.qualifier, value1));
            TEST_UTIL.flush(this.tableName);
            Thread.sleep(2000L);
            Assert.assertEquals((double)2.097152E7, (double)throughputController.getMaxThroughput(), (double)1.0E-6);
            value = new byte[]{};
            table.put(new Put(Bytes.toBytes(6)).addColumn(this.family, this.qualifier, value));
            TEST_UTIL.flush(this.tableName);
            Thread.sleep(2000L);
            Assert.assertEquals((double)Double.MAX_VALUE, (double)throughputController.getMaxThroughput(), (double)1.0E-6);
            conf.set("hbase.regionserver.throughput.controller", NoLimitThroughputController.class.getName());
            regionServer.compactSplitThread.onConfigurationChange(conf);
            Assert.assertTrue((boolean)throughputController.isStopped());
            Assert.assertTrue((boolean)(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitThroughputController));
        }
        finally {
            conn.close();
            TEST_UTIL.shutdownMiniCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testGetCompactionPressureForStripedStore() throws Exception {
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.set("hbase.hstore.engine.class", StripeStoreEngine.class.getName());
        conf.setBoolean("hbase.store.stripe.compaction.flushToL0", false);
        conf.setInt("hbase.store.stripe.initialStripeCount", 2);
        conf.setInt("hbase.store.stripe.compaction.minFiles", 4);
        conf.setInt("hbase.hstore.blockingStoreFiles", 12);
        TEST_UTIL.startMiniCluster(1);
        Connection conn = ConnectionFactory.createConnection(conf);
        try {
            HTableDescriptor htd = new HTableDescriptor(this.tableName);
            htd.addFamily(new HColumnDescriptor(this.family));
            htd.setCompactionEnabled(false);
            TEST_UTIL.getHBaseAdmin().createTable(htd);
            TEST_UTIL.waitTableAvailable(this.tableName);
            HStore store = (HStore)this.getStoreWithName(this.tableName);
            Assert.assertEquals((long)0L, (long)store.getStorefilesCount());
            Assert.assertEquals((double)0.0, (double)store.getCompactionPressure(), (double)1.0E-6);
            Table table = conn.getTable(this.tableName);
            for (int i = 0; i < 4; ++i) {
                byte[] value1 = new byte[]{};
                table.put(new Put(Bytes.toBytes(i)).addColumn(this.family, this.qualifier, value1));
                byte[] value = new byte[]{};
                table.put(new Put(Bytes.toBytes(100 + i)).addColumn(this.family, this.qualifier, value));
                TEST_UTIL.flush(this.tableName);
            }
            Assert.assertEquals((long)8L, (long)store.getStorefilesCount());
            Assert.assertEquals((double)0.0, (double)store.getCompactionPressure(), (double)1.0E-6);
            byte[] value5 = new byte[]{};
            table.put(new Put(Bytes.toBytes(4)).addColumn(this.family, this.qualifier, value5));
            byte[] value4 = new byte[]{};
            table.put(new Put(Bytes.toBytes(104)).addColumn(this.family, this.qualifier, value4));
            TEST_UTIL.flush(this.tableName);
            Assert.assertEquals((long)10L, (long)store.getStorefilesCount());
            Assert.assertEquals((double)0.5, (double)store.getCompactionPressure(), (double)1.0E-6);
            byte[] value3 = new byte[]{};
            table.put(new Put(Bytes.toBytes(5)).addColumn(this.family, this.qualifier, value3));
            byte[] value2 = new byte[]{};
            table.put(new Put(Bytes.toBytes(105)).addColumn(this.family, this.qualifier, value2));
            TEST_UTIL.flush(this.tableName);
            Assert.assertEquals((long)12L, (long)store.getStorefilesCount());
            Assert.assertEquals((double)1.0, (double)store.getCompactionPressure(), (double)1.0E-6);
            byte[] value1 = new byte[]{};
            table.put(new Put(Bytes.toBytes(6)).addColumn(this.family, this.qualifier, value1));
            byte[] value = new byte[]{};
            table.put(new Put(Bytes.toBytes(106)).addColumn(this.family, this.qualifier, value));
            TEST_UTIL.flush(this.tableName);
            Assert.assertEquals((long)14L, (long)store.getStorefilesCount());
            Assert.assertEquals((double)2.0, (double)store.getCompactionPressure(), (double)1.0E-6);
        }
        finally {
            conn.close();
            TEST_UTIL.shutdownMiniCluster();
        }
    }
}

