package org.apache.hadoop.hbase.regionserver;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.class */
public class TestBulkLoadReplication extends TestReplicationBase {
    private static final String PEER1_CLUSTER_ID = "peer1";
    private static final String PEER4_CLUSTER_ID = "peer4";
    private static final String PEER3_CLUSTER_ID = "peer3";
    private static final String PEER_ID1 = "1";
    private static final String PEER_ID3 = "3";
    private static final String PEER_ID4 = "4";
    private static CountDownLatch BULK_LOAD_LATCH;
    private static HBaseTestingUtility utility3;
    private static HBaseTestingUtility utility4;
    private static Configuration conf3;
    private static Configuration conf4;

    @Rule
    public TestName name = new TestName();
    protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
    private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);

    @ClassRule
    public static TemporaryFolder testFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication$BulkReplicationTestObserver.class */
    public static class BulkReplicationTestObserver extends BaseRegionObserver {
        public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> observerContext, List<Pair<byte[], String>> list) throws IOException {
            TestBulkLoadReplication.BULK_LOADS_COUNT.incrementAndGet();
        }

        public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> observerContext, List<Pair<byte[], String>> list, boolean z) throws IOException {
            if (!z) {
                return true;
            }
            TestBulkLoadReplication.BULK_LOAD_LATCH.countDown();
            return true;
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
        conf3 = HBaseConfiguration.create(conf1);
        setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
        conf3.set("zookeeper.znode.parent", "/3");
        utility3 = new HBaseTestingUtility(conf3);
        conf4 = HBaseConfiguration.create(conf1);
        setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
        conf4.set("zookeeper.znode.parent", "/4");
        utility4 = new HBaseTestingUtility(conf4);
        TestReplicationBase.setUpBeforeClass();
        startCluster(utility3, conf3);
        startCluster(utility4, conf4);
    }

    private static void startCluster(HBaseTestingUtility hBaseTestingUtility, Configuration configuration) throws Exception {
        LOG.info("Setup Zk to same one from utility1 and utility4");
        hBaseTestingUtility.setZkCluster(utility1.getZkCluster());
        hBaseTestingUtility.startMiniCluster(2);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(famName);
        hColumnDescriptor.setScope(1);
        hTableDescriptor.addFamily(hColumnDescriptor);
        Admin admin = ConnectionFactory.createConnection(configuration).getAdmin();
        try {
            admin.createTable(hTableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
            if (admin != null) {
                admin.close();
            }
            hBaseTestingUtility.waitUntilAllRegionsAssigned(tableName);
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Before
    public void setUpBase() throws Exception {
        ReplicationPeerConfig peerConfigForCluster = getPeerConfigForCluster(utility1);
        ReplicationPeerConfig peerConfigForCluster2 = getPeerConfigForCluster(utility4);
        ReplicationPeerConfig peerConfigForCluster3 = getPeerConfigForCluster(utility3);
        getReplicationAdmin(utility1.getConfiguration()).addPeer(PEER_ID4, peerConfigForCluster2);
        ReplicationAdmin replicationAdmin = getReplicationAdmin(utility4.getConfiguration());
        replicationAdmin.addPeer(PEER_ID1, peerConfigForCluster);
        replicationAdmin.addPeer(PEER_ID3, peerConfigForCluster3);
        getReplicationAdmin(utility3.getConfiguration()).addPeer(PEER_ID4, peerConfigForCluster2);
        setupCoprocessor(utility1);
        setupCoprocessor(utility4);
        setupCoprocessor(utility3);
    }

    private ReplicationAdmin getReplicationAdmin(Configuration configuration) throws IOException {
        return new ReplicationAdmin(configuration);
    }

    private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility hBaseTestingUtility) {
        ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
        replicationPeerConfig.setClusterKey(hBaseTestingUtility.getClusterKey());
        return replicationPeerConfig;
    }

    private void setupCoprocessor(HBaseTestingUtility hBaseTestingUtility) throws IOException {
        Iterator<HRegion> it = hBaseTestingUtility.getHBaseCluster().getRegions(tableName).iterator();
        while (it.hasNext()) {
            it.next().getCoprocessorHost().load(BulkReplicationTestObserver.class, 0, hBaseTestingUtility.getConfiguration());
        }
    }

    @After
    public void tearDownBase() throws Exception {
        getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID1);
        getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID3);
        getReplicationAdmin(utility3.getConfiguration()).removePeer(PEER_ID4);
    }

    private static void setupBulkLoadConfigsForCluster(Configuration configuration, String str) throws Exception {
        configuration.setBoolean("hbase.replication.bulkload.enabled", true);
        configuration.set("hbase.replication.cluster.id", str);
        configuration.writeXml(new FileOutputStream(new File(testFolder.newFolder(str).getAbsolutePath() + "/hbase-site.xml")));
        configuration.set("hbase.replication.conf.dir", testFolder.getRoot().getAbsolutePath());
    }

    @Test
    public void testBulkLoadReplicationActiveActive() throws Exception {
        Table table = utility1.getConnection().getTable(TestReplicationBase.tableName);
        Table table2 = utility4.getConnection().getTable(TestReplicationBase.tableName);
        Table table3 = utility3.getConnection().getTable(TestReplicationBase.tableName);
        assertBulkLoadConditions(Bytes.toBytes("001"), Bytes.toBytes("v1"), utility1, table, table2, table3);
        assertBulkLoadConditions(Bytes.toBytes("002"), Bytes.toBytes("v2"), utility4, table2, table, table3);
        assertBulkLoadConditions(Bytes.toBytes("003"), Bytes.toBytes("v3"), utility3, table3, table2, table);
        Thread.sleep(400L);
        Assert.assertEquals(9L, BULK_LOADS_COUNT.get());
    }

    private void assertBulkLoadConditions(byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility, Table... tableArr) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(3);
        bulkLoadOnCluster(bArr, bArr2, hBaseTestingUtility);
        Assert.assertTrue(BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        assertTableHasValue(tableArr[0], bArr, bArr2);
        assertTableHasValue(tableArr[1], bArr, bArr2);
        assertTableHasValue(tableArr[2], bArr, bArr2);
    }

    private void bulkLoadOnCluster(byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility) throws Exception {
        copyToHdfs(createHFileForFamilies(bArr, bArr2, hBaseTestingUtility.getConfiguration()), hBaseTestingUtility.getDFSCluster());
        new LoadIncrementalHFiles(hBaseTestingUtility.getConfiguration()).run(new String[]{"/bulk_dir/region1/", tableName.getNameAsString()});
    }

    private void copyToHdfs(String str, MiniDFSCluster miniDFSCluster) throws Exception {
        Path path = new Path("/bulk_dir/region1/f");
        miniDFSCluster.getFileSystem().mkdirs(path);
        miniDFSCluster.getFileSystem().copyFromLocalFile(new Path(str), path);
    }

    private void assertTableHasValue(Table table, byte[] bArr, byte[] bArr2) throws Exception {
        Result result = table.get(new Get(bArr));
        Assert.assertTrue(result.advance());
        Assert.assertEquals(Bytes.toString(bArr2), Bytes.toString(result.value()));
    }

    private String createHFileForFamilies(byte[] bArr, byte[] bArr2, Configuration configuration) throws IOException {
        KeyValue keyValue = new KeyValue(bArr, famName, Bytes.toBytes(PEER_ID1), System.currentTimeMillis(), KeyValue.Type.Put, bArr2);
        HFile.WriterFactory writerFactoryNoCache = HFile.getWriterFactoryNoCache(configuration);
        File newFile = testFolder.newFile();
        FSDataOutputStream fSDataOutputStream = new FSDataOutputStream(new FileOutputStream(newFile), (FileSystem.Statistics) null);
        try {
            writerFactoryNoCache.withOutputStream(fSDataOutputStream);
            writerFactoryNoCache.withFileContext(new HFileContext());
            HFile.Writer create = writerFactoryNoCache.create();
            try {
                create.append(keyValue);
                create.close();
                return newFile.getAbsoluteFile().getAbsolutePath();
            } catch (Throwable th) {
                create.close();
                throw th;
            }
        } finally {
            fSDataOutputStream.close();
        }
    }
}
