/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy;
import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hive.com.google.common.hash.Hashing;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestPerColumnFamilyFlush {
    private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
    Region region = null;
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
    public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
    public static final byte[][] FAMILIES = new byte[][]{Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5")};
    public static final byte[] FAMILY1 = FAMILIES[0];
    public static final byte[] FAMILY2 = FAMILIES[1];
    public static final byte[] FAMILY3 = FAMILIES[2];

    private void initHRegion(String callingMethod, Configuration conf) throws IOException {
        HTableDescriptor htd = new HTableDescriptor(TABLENAME);
        for (byte[] family : FAMILIES) {
            htd.addFamily(new HColumnDescriptor(family));
        }
        HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
        Path path = new Path(DIR, callingMethod);
        this.region = HRegion.createHRegion(info, path, conf, htd);
    }

    private Put createPut(int familyNum, int putNum) {
        byte[] qf = Bytes.toBytes("q" + familyNum);
        byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
        byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
        Put p = new Put(row);
        p.addColumn(FAMILIES[familyNum - 1], qf, val);
        return p;
    }

    private Get createGet(int familyNum, int putNum) {
        byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
        return new Get(row);
    }

    void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
        Result r = table.get(this.createGet(familyNum, putNum));
        byte[] family = FAMILIES[familyNum - 1];
        byte[] qf = Bytes.toBytes("q" + familyNum);
        byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
        Assert.assertNotNull((String)("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
        Assert.assertNotNull((String)("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family).get(qf));
        Assert.assertTrue((String)("Incorrect value for Put#" + putNum + " for CF# " + familyNum), (boolean)Arrays.equals((byte[])r.getFamilyMap(family).get(qf), val));
    }

    @Test(timeout=180000L)
    public void testSelectiveFlushWhenEnabled() throws IOException {
        int i;
        Configuration conf = HBaseConfiguration.create();
        conf.setLong("hbase.hregion.memstore.flush.size", 204800L);
        conf.set("hbase.regionserver.flush.policy", FlushLargeStoresPolicy.class.getName());
        conf.setLong("hbase.hregion.percolumnfamilyflush.size.lower.bound", 102400L);
        this.initHRegion("testSelectiveFlushWhenEnabled", conf);
        for (int i2 = 1; i2 <= 1200; ++i2) {
            this.region.put(this.createPut(1, i2));
            if (i2 > 100) continue;
            this.region.put(this.createPut(2, i2));
            if (i2 > 50) continue;
            this.region.put(this.createPut(3, i2));
        }
        long totalMemstoreSize = this.region.getMemstoreSize();
        long smallestSeqCF1 = this.region.getOldestSeqIdOfStore(FAMILY1);
        long smallestSeqCF2 = this.region.getOldestSeqIdOfStore(FAMILY2);
        long smallestSeqCF3 = this.region.getOldestSeqIdOfStore(FAMILY3);
        long cf1MemstoreSize = this.region.getStore(FAMILY1).getMemStoreSize();
        long cf2MemstoreSize = this.region.getStore(FAMILY2).getMemStoreSize();
        long cf3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        long smallestSeqInRegionCurrentMemstore = this.getWAL(this.region).getEarliestMemstoreSeqNum(this.region.getRegionInfo().getEncodedNameAsBytes());
        Assert.assertEquals((long)smallestSeqCF1, (long)smallestSeqInRegionCurrentMemstore);
        Assert.assertTrue((smallestSeqCF1 < smallestSeqCF2 ? 1 : 0) != 0);
        Assert.assertTrue((smallestSeqCF2 < smallestSeqCF3 ? 1 : 0) != 0);
        Assert.assertTrue((cf1MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertTrue((cf2MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertTrue((cf3MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)(totalMemstoreSize + 3L * DefaultMemStore.DEEP_OVERHEAD), (long)(cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
        this.region.flush(false);
        long oldCF2MemstoreSize = cf2MemstoreSize;
        long oldCF3MemstoreSize = cf3MemstoreSize;
        cf1MemstoreSize = this.region.getStore(FAMILY1).getMemStoreSize();
        cf2MemstoreSize = this.region.getStore(FAMILY2).getMemStoreSize();
        cf3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        totalMemstoreSize = this.region.getMemstoreSize();
        smallestSeqInRegionCurrentMemstore = this.getWAL(this.region).getEarliestMemstoreSeqNum(this.region.getRegionInfo().getEncodedNameAsBytes());
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf1MemstoreSize);
        Assert.assertEquals((long)cf2MemstoreSize, (long)oldCF2MemstoreSize);
        Assert.assertEquals((long)cf3MemstoreSize, (long)oldCF3MemstoreSize);
        Assert.assertEquals((long)smallestSeqInRegionCurrentMemstore, (long)smallestSeqCF2);
        Assert.assertEquals((long)(totalMemstoreSize + 2L * DefaultMemStore.DEEP_OVERHEAD), (long)(cf2MemstoreSize + cf3MemstoreSize));
        for (i = 1200; i < 2400; ++i) {
            this.region.put(this.createPut(2, i));
            if (i - 1200 >= 100) continue;
            this.region.put(this.createPut(3, i));
        }
        oldCF3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        this.region.flush(false);
        cf1MemstoreSize = this.region.getStore(FAMILY1).getMemStoreSize();
        cf2MemstoreSize = this.region.getStore(FAMILY2).getMemStoreSize();
        cf3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        totalMemstoreSize = this.region.getMemstoreSize();
        smallestSeqInRegionCurrentMemstore = this.getWAL(this.region).getEarliestMemstoreSeqNum(this.region.getRegionInfo().getEncodedNameAsBytes());
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf1MemstoreSize);
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf2MemstoreSize);
        Assert.assertEquals((long)cf3MemstoreSize, (long)oldCF3MemstoreSize);
        Assert.assertEquals((long)(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD), (long)cf3MemstoreSize);
        Assert.assertEquals((long)smallestSeqInRegionCurrentMemstore, (long)smallestSeqCF3);
        this.region.flush(true);
        for (i = 1; i <= 300; ++i) {
            this.region.put(this.createPut(1, i));
            this.region.put(this.createPut(2, i));
            this.region.put(this.createPut(3, i));
            this.region.put(this.createPut(4, i));
            this.region.put(this.createPut(5, i));
        }
        this.region.flush(false);
        Assert.assertEquals((long)0L, (long)this.region.getMemstoreSize());
    }

    @Test(timeout=180000L)
    public void testSelectiveFlushWhenNotEnabled() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.setLong("hbase.hregion.memstore.flush.size", 204800L);
        conf.set("hbase.regionserver.flush.policy", FlushAllStoresPolicy.class.getName());
        this.initHRegion("testSelectiveFlushWhenNotEnabled", conf);
        for (int i = 1; i <= 1200; ++i) {
            this.region.put(this.createPut(1, i));
            if (i > 100) continue;
            this.region.put(this.createPut(2, i));
            if (i > 50) continue;
            this.region.put(this.createPut(3, i));
        }
        long totalMemstoreSize = this.region.getMemstoreSize();
        long cf1MemstoreSize = this.region.getStore(FAMILY1).getMemStoreSize();
        long cf2MemstoreSize = this.region.getStore(FAMILY2).getMemStoreSize();
        long cf3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        Assert.assertTrue((cf1MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertTrue((cf2MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertTrue((cf3MemstoreSize > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)(totalMemstoreSize + 3L * DefaultMemStore.DEEP_OVERHEAD), (long)(cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
        this.region.flush(false);
        cf1MemstoreSize = this.region.getStore(FAMILY1).getMemStoreSize();
        cf2MemstoreSize = this.region.getStore(FAMILY2).getMemStoreSize();
        cf3MemstoreSize = this.region.getStore(FAMILY3).getMemStoreSize();
        totalMemstoreSize = this.region.getMemstoreSize();
        long smallestSeqInRegionCurrentMemstore = ((HRegion)this.region).getWAL().getEarliestMemstoreSeqNum(this.region.getRegionInfo().getEncodedNameAsBytes());
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf1MemstoreSize);
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf2MemstoreSize);
        Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf3MemstoreSize);
        Assert.assertEquals((long)0L, (long)totalMemstoreSize);
        Assert.assertEquals((long)-1L, (long)smallestSeqInRegionCurrentMemstore);
    }

    private static Pair<Region, HRegionServer> getRegionWithName(TableName tableName) {
        MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
        List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
        for (int i = 0; i < cluster.getRegionServerThreads().size(); ++i) {
            HRegionServer hrs = rsts.get(i).getRegionServer();
            Iterator<Region> i$ = hrs.getOnlineRegions(tableName).iterator();
            if (!i$.hasNext()) continue;
            Region region = i$.next();
            return Pair.newPair(region, hrs);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestLogReplay() throws Exception {
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.setLong("hbase.hregion.memstore.flush.size", 20000L);
        conf.set("hbase.regionserver.flush.policy", FlushLargeStoresPolicy.class.getName());
        conf.setLong("hbase.hregion.percolumnfamilyflush.size.lower.bound", 10000L);
        int numRegionServers = 4;
        try {
            TEST_UTIL.startMiniCluster(4);
            TEST_UTIL.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
            HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
            HTableDescriptor htd = table.getTableDescriptor();
            for (byte[] family : FAMILIES) {
                if (htd.hasFamily(family)) continue;
                htd.addFamily(new HColumnDescriptor(family));
            }
            for (int i = 1; i <= 80; ++i) {
                table.put(this.createPut(1, i));
                if (i > 10) continue;
                table.put(this.createPut(2, i));
                table.put(this.createPut(3, i));
            }
            table.flushCommits();
            Thread.sleep(1000L);
            Pair<Region, HRegionServer> desiredRegionAndServer = TestPerColumnFamilyFlush.getRegionWithName(TABLENAME);
            Region desiredRegion = desiredRegionAndServer.getFirst();
            Assert.assertTrue((String)"Could not find a region which hosts the new region.", (desiredRegion != null ? 1 : 0) != 0);
            desiredRegion.flush(false);
            long totalMemstoreSize = desiredRegion.getMemstoreSize();
            long cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
            long cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
            long cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
            Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)cf1MemstoreSize);
            Assert.assertTrue((cf2MemstoreSize > 0L ? 1 : 0) != 0);
            Assert.assertTrue((cf3MemstoreSize > 0L ? 1 : 0) != 0);
            Assert.assertEquals((long)(totalMemstoreSize + 2L * DefaultMemStore.DEEP_OVERHEAD), (long)(cf2MemstoreSize + cf3MemstoreSize));
            Thread.sleep(2000L);
            HRegionServer rs = desiredRegionAndServer.getSecond();
            rs.abort("testing");
            for (int i = 1; i <= 80; ++i) {
                this.verifyEdit(1, i, table);
                if (i > 10) continue;
                this.verifyEdit(2, i, table);
                this.verifyEdit(3, i, table);
            }
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
    }

    @Test(timeout=180000L)
    public void testLogReplayWithDistributedReplay() throws Exception {
        TEST_UTIL.getConfiguration().setBoolean("hbase.master.distributed.log.replay", true);
        this.doTestLogReplay();
    }

    @Test(timeout=180000L)
    public void testLogReplayWithDistributedLogSplit() throws Exception {
        TEST_UTIL.getConfiguration().setBoolean("hbase.master.distributed.log.replay", false);
        this.doTestLogReplay();
    }

    private WAL getWAL(Region region) {
        return ((HRegion)region).getWAL();
    }

    private int getNumRolledLogFiles(Region region) {
        return ((FSHLog)this.getWAL(region)).getNumRolledLogFiles();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=180000L)
    public void testFlushingWhenLogRolling() throws Exception {
        TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.setLong("hbase.hregion.memstore.flush.size", 0x8000000L);
        conf.set("hbase.regionserver.flush.policy", FlushLargeStoresPolicy.class.getName());
        long cfFlushSizeLowerBound = 2048L;
        conf.setLong("hbase.hregion.percolumnfamilyflush.size.lower.bound", cfFlushSizeLowerBound);
        conf.setLong("hbase.regionserver.logroll.period", 3600000L);
        conf.setLong("hbase.regionserver.hlog.blocksize", 0x8000000L);
        int maxLogs = 10;
        conf.setInt("hbase.regionserver.maxlogs", 10);
        boolean numRegionServers = true;
        TEST_UTIL.startMiniCluster(1);
        try {
            int i;
            HTable table = null;
            table = TEST_UTIL.createTable(tableName, FAMILIES);
            try (Admin admin = TEST_UTIL.getConnection().getAdmin();){
                admin.flush(TableName.NAMESPACE_TABLE_NAME);
            }
            Pair<Region, HRegionServer> desiredRegionAndServer = TestPerColumnFamilyFlush.getRegionWithName(tableName);
            final Region desiredRegion = desiredRegionAndServer.getFirst();
            Assert.assertTrue((String)"Could not find a region which hosts the new region.", (desiredRegion != null ? 1 : 0) != 0);
            LOG.info("Writing to region=" + desiredRegion);
            for (i = 1; i <= 3; ++i) {
                table.put(this.createPut(i, 0));
            }
            for (i = 0; i < 10; ++i) {
                for (int j = 0; j < 100; ++j) {
                    table.put(this.createPut(1, i * 100 + j));
                }
                table.flushCommits();
                int currentNumRolledLogFiles = this.getNumRolledLogFiles(desiredRegion);
                Assert.assertNull((Object)this.getWAL(desiredRegion).rollWriter());
                while (this.getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
                    Thread.sleep(100L);
                }
            }
            table.close();
            Assert.assertEquals((long)10L, (long)this.getNumRolledLogFiles(desiredRegion));
            Assert.assertTrue((desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound ? 1 : 0) != 0);
            Assert.assertTrue((desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound ? 1 : 0) != 0);
            Assert.assertTrue((desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound ? 1 : 0) != 0);
            table.put(this.createPut(1, 12345678));
            table.flushCommits();
            desiredRegionAndServer.getSecond().walRoller.requestRollAll();
            TEST_UTIL.waitFor(30000L, new Waiter.ExplainingPredicate<Exception>(){

                @Override
                public boolean evaluate() throws Exception {
                    return desiredRegion.getMemstoreSize() == 0L;
                }

                @Override
                public String explainFailure() throws Exception {
                    long memstoreSize = desiredRegion.getMemstoreSize();
                    if (memstoreSize > 0L) {
                        return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
                    }
                    return "Unknown";
                }
            });
            LOG.info("Finished waiting on flush after too many WALs...");
            Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)desiredRegion.getStore(FAMILY1).getMemStoreSize());
            Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)desiredRegion.getStore(FAMILY2).getMemStoreSize());
            Assert.assertEquals((long)DefaultMemStore.DEEP_OVERHEAD, (long)desiredRegion.getStore(FAMILY3).getMemStoreSize());
            Assert.assertNull((Object)this.getWAL(desiredRegion).rollWriter(true));
            Assert.assertTrue((this.getNumRolledLogFiles(desiredRegion) < 10 ? 1 : 0) != 0);
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
    }

    private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
        Region region = TestPerColumnFamilyFlush.getRegionWithName(table.getName()).getFirst();
        byte[] qf = Bytes.toBytes("qf");
        Random rand = new Random();
        byte[] value1 = new byte[100];
        byte[] value2 = new byte[200];
        byte[] value3 = new byte[400];
        for (int i = 0; i < 10000; ++i) {
            Put put = new Put(Bytes.toBytes("row-" + i));
            rand.setSeed(i);
            rand.nextBytes(value1);
            rand.nextBytes(value2);
            rand.nextBytes(value3);
            put.addColumn(FAMILY1, qf, value1);
            put.addColumn(FAMILY2, qf, value2);
            put.addColumn(FAMILY3, qf, value3);
            table.put(put);
            while (region.getMemstoreSize() > memstoreFlushSize) {
                Thread.sleep(100L);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=180000L)
    public void testCompareStoreFileCount() throws Exception {
        Table table;
        Connection conn;
        long memstoreFlushSize = 0x100000L;
        Configuration conf = TEST_UTIL.getConfiguration();
        conf.setLong("hbase.hregion.memstore.flush.size", memstoreFlushSize);
        conf.set("hbase.regionserver.flush.policy", FlushAllStoresPolicy.class.getName());
        conf.setLong("hbase.hregion.percolumnfamilyflush.size.lower.bound", 409600L);
        conf.setInt("hbase.hstore.blockingStoreFiles", 10000);
        conf.set("hbase.regionserver.region.split.policy", ConstantSizeRegionSplitPolicy.class.getName());
        HTableDescriptor htd = new HTableDescriptor(TABLENAME);
        htd.setCompactionEnabled(false);
        htd.addFamily(new HColumnDescriptor(FAMILY1));
        htd.addFamily(new HColumnDescriptor(FAMILY2));
        htd.addFamily(new HColumnDescriptor(FAMILY3));
        LOG.info("==============Test with selective flush disabled===============");
        int cf1StoreFileCount = -1;
        int cf2StoreFileCount = -1;
        int cf3StoreFileCount = -1;
        int cf1StoreFileCount1 = -1;
        int cf2StoreFileCount1 = -1;
        int cf3StoreFileCount1 = -1;
        try {
            TEST_UTIL.startMiniCluster(1);
            TEST_UTIL.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
            TEST_UTIL.getHBaseAdmin().createTable(htd);
            TEST_UTIL.waitTableAvailable(TABLENAME);
            conn = ConnectionFactory.createConnection(conf);
            table = conn.getTable(TABLENAME);
            this.doPut(table, memstoreFlushSize);
            table.close();
            conn.close();
            Region region = TestPerColumnFamilyFlush.getRegionWithName(TABLENAME).getFirst();
            cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
            cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
            cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
        LOG.info("==============Test with selective flush enabled===============");
        conf.set("hbase.regionserver.flush.policy", FlushLargeStoresPolicy.class.getName());
        try {
            TEST_UTIL.startMiniCluster(1);
            TEST_UTIL.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
            TEST_UTIL.getHBaseAdmin().createTable(htd);
            conn = ConnectionFactory.createConnection(conf);
            table = conn.getTable(TABLENAME);
            this.doPut(table, memstoreFlushSize);
            table.close();
            conn.close();
            this.region = TestPerColumnFamilyFlush.getRegionWithName(TABLENAME).getFirst();
            cf1StoreFileCount1 = this.region.getStore(FAMILY1).getStorefilesCount();
            cf2StoreFileCount1 = this.region.getStore(FAMILY2).getStorefilesCount();
            cf3StoreFileCount1 = this.region.getStore(FAMILY3).getStorefilesCount();
        }
        finally {
            TEST_UTIL.shutdownMiniCluster();
        }
        LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
        LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
        Assert.assertTrue((cf1StoreFileCount1 < cf1StoreFileCount ? 1 : 0) != 0);
        Assert.assertTrue((cf2StoreFileCount1 < cf2StoreFileCount ? 1 : 0) != 0);
    }

    public static void main(String[] args) throws Exception {
        int numRegions = Integer.parseInt(args[0]);
        long numRows = Long.parseLong(args[1]);
        HTableDescriptor htd = new HTableDescriptor(TABLENAME);
        htd.setMaxFileSize(0x280000000L);
        htd.setValue("SPLIT_POLICY", ConstantSizeRegionSplitPolicy.class.getName());
        htd.addFamily(new HColumnDescriptor(FAMILY1));
        htd.addFamily(new HColumnDescriptor(FAMILY2));
        htd.addFamily(new HColumnDescriptor(FAMILY3));
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TABLENAME)) {
            admin.disableTable(TABLENAME);
            admin.deleteTable(TABLENAME);
        }
        if (numRegions >= 3) {
            byte[] startKey = new byte[16];
            byte[] endKey = new byte[16];
            Arrays.fill(endKey, (byte)-1);
            admin.createTable(htd, startKey, endKey, numRegions);
        } else {
            admin.createTable(htd);
        }
        admin.close();
        Table table = conn.getTable(TABLENAME);
        byte[] qf = Bytes.toBytes("qf");
        Random rand = new Random();
        byte[] value1 = new byte[16];
        byte[] value2 = new byte[256];
        byte[] value3 = new byte[4096];
        for (long i = 0L; i < numRows; ++i) {
            Put put = new Put(Hashing.md5().hashLong(i).asBytes());
            rand.setSeed(i);
            rand.nextBytes(value1);
            rand.nextBytes(value2);
            rand.nextBytes(value3);
            put.addColumn(FAMILY1, qf, value1);
            put.addColumn(FAMILY2, qf, value2);
            put.addColumn(FAMILY3, qf, value3);
            table.put(put);
            if (i % 10000L != 0L) continue;
            LOG.info(i + " rows put");
        }
        table.close();
        conn.close();
    }
}

