/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, MediumTests.class})
public class TestBulkLoadReplication
extends TestReplicationBase {
    protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
    private static final String PEER1_CLUSTER_ID = "peer1";
    private static final String PEER4_CLUSTER_ID = "peer4";
    private static final String PEER3_CLUSTER_ID = "peer3";
    private static final String PEER_ID1 = "1";
    private static final String PEER_ID3 = "3";
    private static final String PEER_ID4 = "4";
    private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
    private static CountDownLatch BULK_LOAD_LATCH;
    private static HBaseTestingUtility utility3;
    private static HBaseTestingUtility utility4;
    private static Configuration conf3;
    private static Configuration conf4;
    @Rule
    public TestName name = new TestName();
    @ClassRule
    public static TemporaryFolder testFolder;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
        conf3 = HBaseConfiguration.create(conf1);
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
        conf3.set("zookeeper.znode.parent", "/3");
        utility3 = new HBaseTestingUtility(conf3);
        conf4 = HBaseConfiguration.create(conf1);
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
        conf4.set("zookeeper.znode.parent", "/4");
        utility4 = new HBaseTestingUtility(conf4);
        TestReplicationBase.setUpBeforeClass();
        TestBulkLoadReplication.startCluster(utility3, conf3);
        TestBulkLoadReplication.startCluster(utility4, conf4);
    }

    private static void startCluster(HBaseTestingUtility util, Configuration configuration) throws Exception {
        LOG.info("Setup Zk to same one from utility1 and utility4");
        util.setZkCluster(utility1.getZkCluster());
        util.startMiniCluster(2);
        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
        HColumnDescriptor columnDesc = new HColumnDescriptor(famName);
        columnDesc.setScope(1);
        tableDesc.addFamily(columnDesc);
        Connection connection = ConnectionFactory.createConnection(configuration);
        try (Admin admin = connection.getAdmin();){
            admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
        }
        util.waitUntilAllRegionsAssigned(tableName);
    }

    @Before
    public void setUpBase() throws Exception {
        ReplicationPeerConfig peer1Config = this.getPeerConfigForCluster(utility1);
        ReplicationPeerConfig peer4Config = this.getPeerConfigForCluster(utility4);
        ReplicationPeerConfig peer3Config = this.getPeerConfigForCluster(utility3);
        this.getReplicationAdmin(utility1.getConfiguration()).addPeer(PEER_ID4, peer4Config);
        ReplicationAdmin admin4 = this.getReplicationAdmin(utility4.getConfiguration());
        admin4.addPeer(PEER_ID1, peer1Config);
        admin4.addPeer(PEER_ID3, peer3Config);
        this.getReplicationAdmin(utility3.getConfiguration()).addPeer(PEER_ID4, peer4Config);
        this.setupCoprocessor(utility1);
        this.setupCoprocessor(utility4);
        this.setupCoprocessor(utility3);
    }

    private ReplicationAdmin getReplicationAdmin(Configuration configuration) throws IOException {
        return new ReplicationAdmin(configuration);
    }

    private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
        ReplicationPeerConfig config = new ReplicationPeerConfig();
        config.setClusterKey(util.getClusterKey());
        return config;
    }

    private void setupCoprocessor(HBaseTestingUtility cluster) throws IOException {
        for (HRegion region : cluster.getHBaseCluster().getRegions(tableName)) {
            region.getCoprocessorHost().load(BulkReplicationTestObserver.class, 0, cluster.getConfiguration());
        }
    }

    @After
    public void tearDownBase() throws Exception {
        this.getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID1);
        this.getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID3);
        this.getReplicationAdmin(utility3.getConfiguration()).removePeer(PEER_ID4);
    }

    private static void setupBulkLoadConfigsForCluster(Configuration config, String clusterReplicationId) throws Exception {
        config.setBoolean("hbase.replication.bulkload.enabled", true);
        config.set("hbase.replication.cluster.id", clusterReplicationId);
        File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
        File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
        config.writeXml((OutputStream)new FileOutputStream(sourceConfigFile));
        config.set("hbase.replication.conf.dir", testFolder.getRoot().getAbsolutePath());
    }

    @Test
    public void testBulkLoadReplicationActiveActive() throws Exception {
        Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
        Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
        Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
        byte[] row = Bytes.toBytes("001");
        byte[] value = Bytes.toBytes("v1");
        this.assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
        row = Bytes.toBytes("002");
        value = Bytes.toBytes("v2");
        this.assertBulkLoadConditions(row, value, utility4, peer4TestTable, peer1TestTable, peer3TestTable);
        row = Bytes.toBytes("003");
        value = Bytes.toBytes("v3");
        this.assertBulkLoadConditions(row, value, utility3, peer3TestTable, peer4TestTable, peer1TestTable);
        Thread.sleep(400L);
        Assert.assertEquals((long)9L, (long)BULK_LOADS_COUNT.get());
    }

    private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table ... tables) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(3);
        this.bulkLoadOnCluster(row, value, utility);
        Assert.assertTrue((boolean)BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        this.assertTableHasValue(tables[0], row, value);
        this.assertTableHasValue(tables[1], row, value);
        this.assertTableHasValue(tables[2], row, value);
    }

    private void bulkLoadOnCluster(byte[] row, byte[] value, HBaseTestingUtility cluster) throws Exception {
        String bulkLoadFile = this.createHFileForFamilies(row, value, cluster.getConfiguration());
        this.copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
        LoadIncrementalHFiles bulkLoadHFilesTool = new LoadIncrementalHFiles(cluster.getConfiguration());
        bulkLoadHFilesTool.run(new String[]{"/bulk_dir/region1/", tableName.getNameAsString()});
    }

    private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
        Path bulkLoadDir = new Path("/bulk_dir/region1/f");
        cluster.getFileSystem().mkdirs(bulkLoadDir);
        cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
    }

    private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
        Get get = new Get(row);
        Result result = table.get(get);
        Assert.assertTrue((boolean)result.advance());
        Assert.assertEquals((Object)Bytes.toString(value), (Object)Bytes.toString(result.value()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) throws IOException {
        KeyValue kv = new KeyValue(row, famName, Bytes.toBytes(PEER_ID1), System.currentTimeMillis(), KeyValue.Type.Put, value);
        HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
        File hFileLocation = testFolder.newFile();
        try (FSDataOutputStream out = new FSDataOutputStream((OutputStream)new FileOutputStream(hFileLocation), null);){
            hFileFactory.withOutputStream(out);
            hFileFactory.withFileContext(new HFileContext());
            try (HFile.Writer writer = hFileFactory.create();){
                writer.append(kv);
            }
        }
        return hFileLocation.getAbsoluteFile().getAbsolutePath();
    }

    static {
        testFolder = new TemporaryFolder();
    }

    public static class BulkReplicationTestObserver
    extends BaseRegionObserver {
        @Override
        public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, List<Pair<byte[], String>> familyPaths) throws IOException {
            BULK_LOADS_COUNT.incrementAndGet();
        }

        @Override
        public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
            if (hasLoaded) {
                BULK_LOAD_LATCH.countDown();
            }
            return true;
        }
    }
}

