package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category({MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.class */
public class TestSplitLogWorker {
    private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
    private static final int WAIT_TIME = 15000;
    private static final HBaseTestingUtility TEST_UTIL;
    private DummyServer ds;
    private ZooKeeperWatcher zkw;
    private SplitLogWorker slw;
    private ExecutorService executorService;
    private ZooKeeperProtos.SplitLogTask.RecoveryMode mode;
    private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
    SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() { // from class: org.apache.hadoop.hbase.regionserver.TestSplitLogWorker.2
        @Override // org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor
        public SplitLogWorker.TaskExecutor.Status exec(String str, ZooKeeperProtos.SplitLogTask.RecoveryMode recoveryMode, CancelableProgressable cancelableProgressable) {
            do {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    return SplitLogWorker.TaskExecutor.Status.PREEMPTED;
                }
            } while (cancelableProgressable.progress());
            return SplitLogWorker.TaskExecutor.Status.PREEMPTED;
        }
    };

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestSplitLogWorker$DummyServer.class */
    class DummyServer implements Server {
        private ZooKeeperWatcher zkw;
        private Configuration conf;
        private CoordinatedStateManager cm;

        public DummyServer(ZooKeeperWatcher zooKeeperWatcher, Configuration configuration) {
            this.zkw = zooKeeperWatcher;
            this.conf = configuration;
            this.cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(configuration);
            this.cm.initialize(this);
        }

        @Override // org.apache.hadoop.hbase.Abortable
        public void abort(String str, Throwable th) {
        }

        @Override // org.apache.hadoop.hbase.Abortable
        public boolean isAborted() {
            return false;
        }

        @Override // org.apache.hadoop.hbase.Stoppable
        public void stop(String str) {
        }

        @Override // org.apache.hadoop.hbase.Stoppable
        public boolean isStopped() {
            return false;
        }

        @Override // org.apache.hadoop.hbase.Server
        public Configuration getConfiguration() {
            return this.conf;
        }

        @Override // org.apache.hadoop.hbase.Server
        public ZooKeeperWatcher getZooKeeper() {
            return this.zkw;
        }

        @Override // org.apache.hadoop.hbase.Server
        public ServerName getServerName() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.Server
        public CoordinatedStateManager getCoordinatedStateManager() {
            return this.cm;
        }

        @Override // org.apache.hadoop.hbase.Server
        public ClusterConnection getConnection() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.Server
        public MetaTableLocator getMetaTableLocator() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.Server
        public ChoreService getChoreService() {
            return null;
        }
    }

    private void waitForCounter(AtomicLong atomicLong, long j, long j2, long j3) throws Exception {
        Assert.assertTrue("ctr=" + atomicLong.get() + ", oldval=" + j + ", newval=" + j2, waitForCounterBoolean(atomicLong, j, j2, j3));
    }

    private boolean waitForCounterBoolean(AtomicLong atomicLong, long j, long j2, long j3) throws Exception {
        return waitForCounterBoolean(atomicLong, j, j2, j3, true);
    }

    private boolean waitForCounterBoolean(final AtomicLong atomicLong, long j, final long j2, long j3, boolean z) throws Exception {
        if (TEST_UTIL.waitFor(j3, 10L, z, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.regionserver.TestSplitLogWorker.1
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                return atomicLong.get() >= j2;
            }
        }) <= 0) {
            return true;
        }
        Assert.assertEquals(j2, atomicLong.get());
        return true;
    }

    @Before
    public void setup() throws Exception {
        TEST_UTIL.startMiniZKCluster();
        Configuration configuration = TEST_UTIL.getConfiguration();
        this.zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null);
        this.ds = new DummyServer(this.zkw, configuration);
        ZKUtil.deleteChildrenRecursively(this.zkw, this.zkw.baseZNode);
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.baseZNode);
        Assert.assertThat(Integer.valueOf(ZKUtil.checkExists(this.zkw, this.zkw.baseZNode)), CoreMatchers.not(CoreMatchers.is(-1)));
        LOG.debug(this.zkw.baseZNode + " created");
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.splitLogZNode);
        Assert.assertThat(Integer.valueOf(ZKUtil.checkExists(this.zkw, this.zkw.splitLogZNode)), CoreMatchers.not(CoreMatchers.is(-1)));
        LOG.debug(this.zkw.splitLogZNode + " created");
        ZKUtil.createAndFailSilent(this.zkw, this.zkw.rsZNode);
        Assert.assertThat(Integer.valueOf(ZKUtil.checkExists(this.zkw, this.zkw.rsZNode)), CoreMatchers.not(CoreMatchers.is(-1)));
        SplitLogCounters.resetCounters();
        this.executorService = new ExecutorService("TestSplitLogWorker");
        this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
        this.mode = configuration.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? ZooKeeperProtos.SplitLogTask.RecoveryMode.LOG_REPLAY : ZooKeeperProtos.SplitLogTask.RecoveryMode.LOG_SPLITTING;
    }

    @After
    public void teardown() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        TEST_UTIL.shutdownMiniZKCluster();
    }

    @Test(timeout = 60000)
    public void testAcquireTaskAtStartup() throws Exception {
        LOG.info("testAcquireTaskAtStartup");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("rs,1,1");
        RegionServerServices regionServer = getRegionServer(valueOf);
        this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "tatas"), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        SplitLogWorker splitLogWorker = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), regionServer, this.neverEndingTask);
        splitLogWorker.start();
        try {
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            Assert.assertTrue(SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "tatas"))).isOwned(valueOf));
            stopSplitLogWorker(splitLogWorker);
        } catch (Throwable th) {
            stopSplitLogWorker(splitLogWorker);
            throw th;
        }
    }

    private void stopSplitLogWorker(SplitLogWorker splitLogWorker) throws InterruptedException {
        if (splitLogWorker != null) {
            splitLogWorker.stop();
            splitLogWorker.worker.join(15000L);
            if (splitLogWorker.worker.isAlive()) {
                Assert.assertTrue(new StringBuilder().append("Could not stop the worker thread slw=").append(splitLogWorker).toString() == null);
            }
        }
    }

    @Test(timeout = 60000)
    public void testRaceForTask() throws Exception {
        LOG.info("testRaceForTask");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("svr1,1,1");
        ServerName valueOf2 = ServerName.valueOf("svr2,1,1");
        this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "trft"), new SplitLogTask.Unassigned(this.MANAGER, this.mode).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        RegionServerServices regionServer = getRegionServer(valueOf);
        RegionServerServices regionServer2 = getRegionServer(valueOf2);
        SplitLogWorker splitLogWorker = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), regionServer, this.neverEndingTask);
        SplitLogWorker splitLogWorker2 = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), regionServer2, this.neverEndingTask);
        splitLogWorker.start();
        splitLogWorker2.start();
        try {
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            Assert.assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0L, 1L, 15000L, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
            SplitLogTask parseFrom = SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "trft")));
            Assert.assertTrue(parseFrom.isOwned(valueOf) || parseFrom.isOwned(valueOf2));
            stopSplitLogWorker(splitLogWorker);
            stopSplitLogWorker(splitLogWorker2);
        } catch (Throwable th) {
            stopSplitLogWorker(splitLogWorker);
            stopSplitLogWorker(splitLogWorker2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testPreemptTask() throws Exception {
        LOG.info("testPreemptTask");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("tpt_svr,1,1");
        String encodedNodeName = ZKSplitLog.getEncodedNodeName(this.zkw, "tpt_task");
        SplitLogWorker splitLogWorker = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), getRegionServer(valueOf), this.neverEndingTask);
        splitLogWorker.start();
        try {
            Thread.yield();
            Thread.sleep(1000L);
            waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0L, 1L, 15000L);
            this.zkw.getRecoverableZooKeeper().create(encodedNodeName, new SplitLogTask.Unassigned(this.MANAGER, this.mode).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            Assert.assertEquals(1L, splitLogWorker.getTaskReadySeq());
            Assert.assertTrue(SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, encodedNodeName)).isOwned(valueOf));
            ZKUtil.setData(this.zkw, encodedNodeName, new SplitLogTask.Owned(this.MANAGER, this.mode).toByteArray());
            waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
            stopSplitLogWorker(splitLogWorker);
        } catch (Throwable th) {
            stopSplitLogWorker(splitLogWorker);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMultipleTasks() throws Exception {
        LOG.info("testMultipleTasks");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("tmt_svr,1,1");
        String encodedNodeName = ZKSplitLog.getEncodedNodeName(this.zkw, "tmt_task");
        SplitLogWorker splitLogWorker = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), getRegionServer(valueOf), this.neverEndingTask);
        splitLogWorker.start();
        try {
            Thread.yield();
            Thread.sleep(100L);
            waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0L, 1L, 15000L);
            SplitLogTask.Unassigned unassigned = new SplitLogTask.Unassigned(this.MANAGER, this.mode);
            this.zkw.getRecoverableZooKeeper().create(encodedNodeName, unassigned.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
            String encodedNodeName2 = ZKSplitLog.getEncodedNodeName(this.zkw, "tmt_task_2");
            this.zkw.getRecoverableZooKeeper().create(encodedNodeName2, unassigned.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            ZKUtil.setData(this.zkw, encodedNodeName, new SplitLogTask.Owned(ServerName.valueOf("another-worker,1,1"), this.mode).toByteArray());
            waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1L, 2L, 15000L);
            Assert.assertEquals(2L, splitLogWorker.getTaskReadySeq());
            Assert.assertTrue(SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, encodedNodeName2)).isOwned(valueOf));
            stopSplitLogWorker(splitLogWorker);
        } catch (Throwable th) {
            stopSplitLogWorker(splitLogWorker);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRescan() throws Exception {
        LOG.info("testRescan");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("svr,1,1");
        this.slw = new SplitLogWorker(this.ds, TEST_UTIL.getConfiguration(), getRegionServer(valueOf), this.neverEndingTask);
        this.slw.start();
        Thread.yield();
        Thread.sleep(100L);
        String encodedNodeName = ZKSplitLog.getEncodedNodeName(this.zkw, "task");
        SplitLogTask.Unassigned unassigned = new SplitLogTask.Unassigned(this.MANAGER, this.mode);
        this.zkw.getRecoverableZooKeeper().create(encodedNodeName, unassigned.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 1L, 15000L);
        ZKUtil.setData(this.zkw, encodedNodeName, unassigned.toByteArray());
        waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0L, 1L, 15000L);
        this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "RESCAN"), unassigned.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1L, 2L, 15000L);
        ZKUtil.setData(this.zkw, encodedNodeName, unassigned.toByteArray());
        waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1L, 2L, 15000L);
        waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0L, 1L, 15000L);
        List<String> listChildrenNoWatch = ZKUtil.listChildrenNoWatch(this.zkw, this.zkw.splitLogZNode);
        LOG.debug(listChildrenNoWatch);
        int i = 0;
        for (String str : listChildrenNoWatch) {
            i++;
            if (str.startsWith("RESCAN")) {
                SplitLogTask parseFrom = SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, ZKUtil.joinZNode(this.zkw.splitLogZNode, ZKSplitLog.getFileName(ZKSplitLog.getEncodedNodeName(this.zkw, str)))));
                Assert.assertTrue(parseFrom.toString(), parseFrom.isDone(valueOf));
            }
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 60000)
    public void testAcquireMultiTasks() throws Exception {
        LOG.info("testAcquireMultiTasks");
        SplitLogCounters.resetCounters();
        ServerName valueOf = ServerName.valueOf("rs,1,1");
        Configuration create = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
        create.setInt("hbase.regionserver.wal.max.splitters", 3);
        RegionServerServices regionServer = getRegionServer(valueOf);
        for (int i = 0; i < 3; i++) {
            this.zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        SplitLogWorker splitLogWorker = new SplitLogWorker(this.ds, create, regionServer, this.neverEndingTask);
        splitLogWorker.start();
        try {
            waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0L, 3L, 15000L);
            for (int i2 = 0; i2 < 3; i2++) {
                Assert.assertTrue(SplitLogTask.parseFrom(ZKUtil.getData(this.zkw, ZKSplitLog.getEncodedNodeName(this.zkw, "tatas" + i2))).isOwned(valueOf));
            }
        } finally {
            stopSplitLogWorker(splitLogWorker);
        }
    }

    private RegionServerServices getRegionServer(ServerName serverName) {
        RegionServerServices regionServerServices = (RegionServerServices) Mockito.mock(RegionServerServices.class);
        Mockito.when(regionServerServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
        Mockito.when(regionServerServices.getServerName()).thenReturn(serverName);
        Mockito.when(regionServerServices.getZooKeeper()).thenReturn(this.zkw);
        Mockito.when(Boolean.valueOf(regionServerServices.isStopped())).thenReturn(false);
        Mockito.when(regionServerServices.getExecutorService()).thenReturn(this.executorService);
        return regionServerServices;
    }

    static {
        Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
        TEST_UTIL = new HBaseTestingUtility();
    }
}
