package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;

@Category({MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationSource.class */
public class TestReplicationSource {
    private static FileSystem FS;
    private static Path oldLogDir;
    private static Path logDir;
    private static final Log LOG = LogFactory.getLog(TestReplicationSource.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility();
    private static Configuration conf = TEST_UTIL.getConfiguration();

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationSource$Mocks.class */
    private static final class Mocks {
        private final ReplicationSourceManager manager;
        private final ReplicationQueues queues;
        private final ReplicationPeers peers;
        private final MetricsSource metrics;
        private final ReplicationPeer peer;
        private final ReplicationEndpoint.Context context;

        private Mocks() {
            this.manager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
            this.queues = (ReplicationQueues) Mockito.mock(ReplicationQueues.class);
            this.peers = (ReplicationPeers) Mockito.mock(ReplicationPeers.class);
            this.metrics = (MetricsSource) Mockito.mock(MetricsSource.class);
            this.peer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
            this.context = (ReplicationEndpoint.Context) Mockito.mock(ReplicationEndpoint.Context.class);
            Mockito.when(Boolean.valueOf(this.peers.getStatusOfPeer(Matchers.anyString()))).thenReturn(true);
            Mockito.when(this.context.getReplicationPeer()).thenReturn(this.peer);
        }

        ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint replicationEndpoint) throws IOException {
            ReplicationSource replicationSource = new ReplicationSource();
            replicationEndpoint.init(this.context);
            replicationSource.init(TestReplicationSource.conf, TestReplicationSource.FS, this.manager, this.queues, this.peers, (Stoppable) Mockito.mock(Stoppable.class), "testPeerClusterZnode", UUID.randomUUID(), replicationEndpoint, this.metrics);
            return replicationSource;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestReplicationSource$ShutdownDelayRegionServer.class */
    public static class ShutdownDelayRegionServer extends HRegionServer {
        public ShutdownDelayRegionServer(Configuration configuration) throws IOException, InterruptedException {
            super(configuration);
        }

        public ShutdownDelayRegionServer(Configuration configuration, CoordinatedStateManager coordinatedStateManager) throws IOException, InterruptedException {
            super(configuration, coordinatedStateManager);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hadoop.hbase.regionserver.HRegionServer
        public void stopServiceThreads() {
            TestReplicationSource.LOG.info("Adding a delay to the regionserver shutdown");
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                TestReplicationSource.LOG.error("Interrupted while sleeping");
            }
            super.stopServiceThreads();
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.startMiniDFSCluster(1);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        Path createRootDir = TEST_UTIL.createRootDir();
        oldLogDir = new Path(createRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        logDir = new Path(createRootDir, HConstants.HREGION_LOGDIR_NAME);
        if (FS.exists(logDir)) {
            FS.delete(logDir, true);
        }
    }

    @Before
    public void setup() throws IOException {
        if (!FS.exists(logDir)) {
            FS.mkdirs(logDir);
        }
        if (!FS.exists(oldLogDir)) {
            FS.mkdirs(oldLogDir);
        }
        TestReplicationEndpoint.ReplicationEndpointForTest.contructedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.startedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.replicateCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.stoppedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.lastEntries = null;
    }

    @After
    public void tearDown() throws IOException {
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        if (FS.exists(logDir)) {
            FS.delete(logDir, true);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL_PEER.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniDFSCluster();
    }

    @Test
    public void testLogMoving() throws Exception {
        Path path = new Path(logDir, "log");
        WALProvider.Writer createWALWriter = WALFactory.createWALWriter(FS, path, TEST_UTIL.getConfiguration());
        for (int i = 0; i < 3; i++) {
            byte[] bytes = Bytes.toBytes(Integer.toString(i));
            KeyValue keyValue = new KeyValue(bytes, bytes, bytes);
            WALEdit wALEdit = new WALEdit();
            wALEdit.add(keyValue);
            createWALWriter.append(new WAL.Entry(new WALKey(bytes, TableName.valueOf(bytes), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID), wALEdit));
            createWALWriter.sync();
        }
        createWALWriter.close();
        WAL.Reader createReader = WALFactory.createReader(FS, path, TEST_UTIL.getConfiguration());
        Assert.assertNotNull(createReader.next());
        FS.rename(path, new Path(oldLogDir, "log"));
        Assert.assertNotNull(createReader.next());
        createReader.next();
        Assert.assertNull(createReader.next());
        createReader.close();
    }

    @Test
    public void testTerminateTimeout() throws Exception {
        final ReplicationSource replicationSource = new ReplicationSource();
        HBaseInterClusterReplicationEndpoint hBaseInterClusterReplicationEndpoint = new HBaseInterClusterReplicationEndpoint() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint, org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService
            public void doStart() {
                notifyStarted();
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint, org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint, org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService
            public void doStop() {
            }
        };
        hBaseInterClusterReplicationEndpoint.start();
        ReplicationPeers replicationPeers = (ReplicationPeers) Mockito.mock(ReplicationPeers.class);
        Configuration create = HBaseConfiguration.create();
        create.setInt("replication.source.maxretriesmultiplier", 1);
        replicationSource.init(create, null, null, null, replicationPeers, null, "testPeer", null, hBaseInterClusterReplicationEndpoint, null);
        final Future<?> submit = Executors.newSingleThreadExecutor().submit(new Runnable() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.2
            @Override // java.lang.Runnable
            public void run() {
                replicationSource.terminate("testing source termination");
            }
        });
        Waiter.waitFor(create, create.getLong("replication.source.sleepforretries", 1000L) * 2, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.3
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                return submit.isDone();
            }
        });
    }

    private void appendEntries(WALProvider.Writer writer, int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            byte[] bytes = Bytes.toBytes(Integer.toString(i2));
            KeyValue keyValue = new KeyValue(bytes, bytes, bytes);
            WALEdit wALEdit = new WALEdit();
            wALEdit.add(keyValue);
            WALKey wALKey = new WALKey(bytes, TableName.valueOf(bytes), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
            treeMap.put(bytes, 1);
            wALKey.setScopes(treeMap);
            writer.append(new WAL.Entry(wALKey, wALEdit));
            writer.sync();
        }
        writer.close();
    }

    private long getPosition(WALFactory wALFactory, Path path, int i) throws IOException {
        WAL.Reader createReader = wALFactory.createReader(FS, path);
        for (int i2 = 0; i2 < i; i2++) {
            createReader.next();
        }
        return createReader.getPosition();
    }

    @Test
    public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
        conf.setInt("replication.source.nb.capacity", 5);
        Mocks mocks = new Mocks();
        final TestReplicationEndpoint.ReplicationEndpointForTest replicationEndpointForTest = new TestReplicationEndpoint.ReplicationEndpointForTest() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.4
            @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
            public WALEntryFilter getWALEntryfilter() {
                return null;
            }
        };
        WALFactory wALFactory = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path path = new Path(logDir, "log.1");
        Path path2 = new Path(logDir, "log.2");
        WALProvider.Writer createWALWriter = WALFactory.createWALWriter(FS, path, TEST_UTIL.getConfiguration());
        WALProvider.Writer createWALWriter2 = WALFactory.createWALWriter(FS, path2, TEST_UTIL.getConfiguration());
        appendEntries(createWALWriter, 3);
        appendEntries(createWALWriter2, 2);
        long position = getPosition(wALFactory, path2, 2);
        ReplicationSource createReplicationSourceWithMocks = mocks.createReplicationSourceWithMocks(replicationEndpointForTest);
        createReplicationSourceWithMocks.run();
        createReplicationSourceWithMocks.enqueueLog(path);
        createReplicationSourceWithMocks.enqueueLog(path2);
        Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.5
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                TestReplicationEndpoint.ReplicationEndpointForTest replicationEndpointForTest2 = replicationEndpointForTest;
                return TestReplicationEndpoint.ReplicationEndpointForTest.replicateCount.get() > 0;
            }
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager) Mockito.verify(mocks.manager, VerificationModeFactory.times(1))).logPositionAndCleanOldLogs((Path) forClass.capture(), Matchers.anyString(), ((Long) forClass2.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        Assert.assertTrue(TestReplicationEndpoint.ReplicationEndpointForTest.lastEntries.size() == 5);
        MatcherAssert.assertThat(forClass.getValue(), CoreMatchers.is(path2));
        MatcherAssert.assertThat(forClass2.getValue(), CoreMatchers.is(Long.valueOf(position)));
    }

    @Test
    public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
        Mocks mocks = new Mocks();
        final ReplicationSource createReplicationSourceWithMocks = mocks.createReplicationSourceWithMocks(new TestReplicationEndpoint.ReplicationEndpointForTest());
        WALFactory wALFactory = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path path = new Path(logDir, "log.1");
        final Path path2 = new Path(logDir, "log.2");
        WALFactory.createWALWriter(FS, path, TEST_UTIL.getConfiguration()).close();
        WALFactory.createWALWriter(FS, path2, TEST_UTIL.getConfiguration()).close();
        final long position = getPosition(wALFactory, path2, 0);
        createReplicationSourceWithMocks.run();
        createReplicationSourceWithMocks.enqueueLog(path);
        createReplicationSourceWithMocks.enqueueLog(path2);
        Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.6
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                return path2.equals(createReplicationSourceWithMocks.getLastLoggedPath()) && createReplicationSourceWithMocks.getLastLoggedPosition() >= position;
            }
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager) Mockito.verify(mocks.manager, VerificationModeFactory.times(1))).logPositionAndCleanOldLogs((Path) forClass.capture(), Matchers.anyString(), ((Long) forClass2.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        MatcherAssert.assertThat(forClass.getValue(), CoreMatchers.is(path2));
        MatcherAssert.assertThat(forClass2.getValue(), CoreMatchers.is(Long.valueOf(position)));
    }

    @Test
    public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exception {
        Mocks mocks = new Mocks();
        Mockito.when(mocks.peer.getTableCFs()).thenReturn(Collections.singletonMap(TableName.valueOf("replicated_table"), Collections.emptyList()));
        WALFactory wALFactory = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path path = new Path(logDir, "log.1");
        final Path path2 = new Path(logDir, "log.2");
        WALProvider.Writer createWALWriter = WALFactory.createWALWriter(FS, path, TEST_UTIL.getConfiguration());
        WALProvider.Writer createWALWriter2 = WALFactory.createWALWriter(FS, path2, TEST_UTIL.getConfiguration());
        appendEntries(createWALWriter, 3);
        appendEntries(createWALWriter2, 2);
        final long position = getPosition(wALFactory, path2, 2);
        final ReplicationSource createReplicationSourceWithMocks = mocks.createReplicationSourceWithMocks(new TestReplicationEndpoint.ReplicationEndpointForTest());
        createReplicationSourceWithMocks.enqueueLog(path);
        createReplicationSourceWithMocks.enqueueLog(path2);
        createReplicationSourceWithMocks.run();
        Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.7
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                return path2.equals(createReplicationSourceWithMocks.getLastLoggedPath()) && createReplicationSourceWithMocks.getLastLoggedPosition() >= position;
            }
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager) Mockito.verify(mocks.manager, VerificationModeFactory.times(1))).logPositionAndCleanOldLogs((Path) forClass.capture(), Matchers.anyString(), ((Long) forClass2.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        MatcherAssert.assertThat(forClass.getValue(), CoreMatchers.is(path2));
        MatcherAssert.assertThat(forClass2.getValue(), CoreMatchers.is(Long.valueOf(position)));
    }

    @Test
    public void testServerShutdownRecoveredQueue() throws Exception {
        try {
            conf.set(WALFactory.WAL_PROVIDER, "defaultProvider");
            conf.setInt("replication.sleep.before.failover", 2000);
            conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
            MiniHBaseCluster startMiniCluster = TEST_UTIL.startMiniCluster(2);
            TEST_UTIL_PEER.startMiniCluster(1);
            HRegionServer regionServer = startMiniCluster.getRegionServer(0);
            final ReplicationSourceManager replicationManager = ((Replication) regionServer.getReplicationSourceService()).getReplicationManager();
            HRegionServer regionServer2 = startMiniCluster.getRegionServer(1);
            final ReplicationSourceManager replicationManager2 = ((Replication) regionServer2.getReplicationSourceService()).getReplicationManager();
            ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
            replicationAdmin.addPeer("TestPeer", new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), (Map<TableName, ? extends Collection<String>>) null);
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.8
                @Override // org.apache.hadoop.hbase.Waiter.Predicate
                public boolean evaluate() throws Exception {
                    return (replicationManager.getSources().isEmpty() || replicationManager2.getSources().isEmpty()) ? false : true;
                }
            });
            replicationAdmin.disablePeer("TestPeer");
            startMiniCluster.stopRegionServer(regionServer.getServerName());
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.9
                @Override // org.apache.hadoop.hbase.Waiter.Predicate
                public boolean evaluate() throws Exception {
                    return replicationManager2.getOldSources().size() == 1;
                }
            });
            final HRegionServer regionServer3 = startMiniCluster.startRegionServer().getRegionServer();
            regionServer3.waitForServerOnline();
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.10
                @Override // org.apache.hadoop.hbase.Waiter.Predicate
                public boolean evaluate() throws Exception {
                    return regionServer3.getReplicationSourceService() != null;
                }
            });
            final ReplicationSourceManager replicationManager3 = ((Replication) regionServer3.getReplicationSourceService()).getReplicationManager();
            Assert.assertEquals(0L, replicationManager3.getOldSources().size());
            startMiniCluster.stopRegionServer(regionServer2.getServerName());
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.11
                @Override // org.apache.hadoop.hbase.Waiter.Predicate
                public boolean evaluate() throws Exception {
                    return replicationManager3.getOldSources().size() == 2;
                }
            });
            replicationAdmin.enablePeer("TestPeer");
            Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.TestReplicationSource.12
                @Override // org.apache.hadoop.hbase.Waiter.Predicate
                public boolean evaluate() throws Exception {
                    return replicationManager3.getOldSources().size() == 0;
                }
            });
            conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
        } catch (Throwable th) {
            conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
            throw th;
        }
    }
}
