/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import drill.shaded.hbase.guava.com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={MediumTests.class})
public class TestReplicationSourceManager {
    private static final Log LOG = LogFactory.getLog(TestReplicationSourceManager.class);
    private static Configuration conf;
    private static HBaseTestingUtility utility;
    private static Replication replication;
    private static ReplicationSourceManager manager;
    private static ZooKeeperWatcher zkw;
    private static HTableDescriptor htd;
    private static HRegionInfo hri;
    private static final byte[] r1;
    private static final byte[] r2;
    private static final byte[] f1;
    private static final TableName test;
    private static final String slaveId = "1";
    private static FileSystem fs;
    private static Path oldLogDir;
    private static Path logDir;
    private static CountDownLatch latch;
    private static List<String> files;
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf = HBaseConfiguration.create();
        conf.set("replication.replicationsource.implementation", ReplicationSourceDummy.class.getCanonicalName());
        conf.setBoolean("hbase.replication", true);
        conf.setLong("replication.sleep.before.failover", 2000L);
        conf.setInt("replication.source.maxretriesmultiplier", 10);
        utility = new HBaseTestingUtility(conf);
        utility.startMiniZKCluster();
        zkw = new ZooKeeperWatcher(conf, "test", null);
        ZKUtil.createWithParents(zkw, "/hbase/replication");
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(conf.get("hbase.zookeeper.quorum") + ":" + conf.get("hbase.zookeeper.property.clientPort") + ":/1"));
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
        ZKUtil.createWithParents(zkw, "/hbase/replication/state");
        ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
        ZKClusterId.setClusterId(zkw, new ClusterId());
        FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
        fs = FileSystem.get((Configuration)conf);
        oldLogDir = new Path(utility.getDataTestDir(), "oldWALs");
        logDir = new Path(utility.getDataTestDir(), "WALs");
        replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
        manager = replication.getReplicationManager();
        manager.addSource(slaveId);
        htd = new HTableDescriptor(test);
        HColumnDescriptor col = new HColumnDescriptor("f1");
        col.setScope(1);
        htd.addFamily(col);
        col = new HColumnDescriptor("f2");
        col.setScope(0);
        htd.addFamily(col);
        hri = new HRegionInfo(htd.getTableName(), r1, r2);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        manager.join();
        utility.shutdownMiniCluster();
    }

    private void cleanLogDir() throws IOException {
        fs.delete(logDir, true);
        fs.delete(oldLogDir, true);
    }

    @Before
    public void setUp() throws Exception {
        LOG.info((Object)("Start " + this.testName.getMethodName()));
        this.cleanLogDir();
    }

    @After
    public void tearDown() throws Exception {
        LOG.info((Object)("End " + this.testName.getMethodName()));
        this.cleanLogDir();
    }

    @Test
    public void testLogRoll() throws Exception {
        long baseline;
        long seq = 0L;
        long time = baseline = 1000L;
        KeyValue kv = new KeyValue(r1, f1, r1);
        WALEdit edit = new WALEdit();
        edit.add(kv);
        ArrayList<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
        listeners.add(replication);
        WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
        WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
        AtomicLong sequenceId = new AtomicLong(1L);
        manager.init();
        HTableDescriptor htd = new HTableDescriptor();
        htd.addFamily(new HColumnDescriptor(f1));
        for (long i = 1L; i < 101L; ++i) {
            if (i > 1L && i % 20L == 0L) {
                wal.rollWriter();
            }
            LOG.info((Object)i);
            long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis()), edit, sequenceId, true, null);
            wal.sync(txid);
        }
        LOG.info((Object)(baseline + " and " + time));
        time = baseline += 101L;
        LOG.info((Object)(baseline + " and " + time));
        for (int i = 0; i < 3; ++i) {
            wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis()), edit, sequenceId, true, null);
        }
        wal.sync();
        Assert.assertEquals((long)6L, (long)manager.getWALs().get(slaveId).size());
        wal.rollWriter();
        manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), slaveId, 0L, false, false);
        wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis()), edit, sequenceId, true, null);
        wal.sync();
        Assert.assertEquals((long)1L, (long)manager.getWALs().size());
    }

    @Test
    public void testClaimQueues() throws Exception {
        conf.setBoolean("hbase.zookeeper.useMulti", true);
        DummyServer server = new DummyServer("hostname0.example.org");
        ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server);
        rq.init(server.getServerName().toString());
        files.add("log1");
        files.add("log2");
        for (String file : files) {
            rq.addLog(slaveId, file);
        }
        DummyServer s1 = new DummyServer("dummyserver1.example.org");
        DummyServer s2 = new DummyServer("dummyserver2.example.org");
        DummyServer s3 = new DummyServer("dummyserver3.example.org");
        DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName().getServerName(), s1);
        DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName().getServerName(), s2);
        DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName().getServerName(), s3);
        latch = new CountDownLatch(3);
        w1.start();
        w2.start();
        w3.start();
        int populatedMap = 0;
        latch.await();
        Assert.assertEquals((long)1L, (long)(populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated()));
        server.abort("", null);
    }

    @Test
    public void testCleanupFailoverQueues() throws Exception {
        DummyServer server = new DummyServer("hostname1.example.org");
        ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server);
        rq.init(server.getServerName().toString());
        TreeSet<String> files = new TreeSet<String>();
        files.add("log1");
        files.add("log2");
        for (String file : files) {
            rq.addLog(slaveId, file);
        }
        DummyServer s1 = new DummyServer("dummyserver1.example.org");
        ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
        rq1.init(s1.getServerName().toString());
        ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
        rp1.init();
        ReplicationSourceManager replicationSourceManager = manager;
        replicationSourceManager.getClass();
        ReplicationSourceManager.NodeFailoverWorker w1 = replicationSourceManager.new ReplicationSourceManager.NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(new Long(1L), new Long(2L)));
        w1.start();
        w1.join(5000L);
        Assert.assertEquals((long)1L, (long)manager.getWalsByIdRecoveredQueues().size());
        String id = "1-" + server.getServerName().getServerName();
        Assert.assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id));
        manager.cleanOldLogs("log2", id, true);
        Assert.assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id));
    }

    @Test
    public void testNodeFailoverDeadServerParsing() throws Exception {
        LOG.debug((Object)"testNodeFailoverDeadServerParsing");
        conf.setBoolean("hbase.zookeeper.useMulti", true);
        DummyServer server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
        ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
        repQueues.init(server.getServerName().toString());
        files.add("log1");
        files.add("log2");
        for (String file : files) {
            repQueues.addLog(slaveId, file);
        }
        DummyServer s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
        DummyServer s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
        DummyServer s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
        ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
        rq1.init(s1.getServerName().toString());
        SortedMap<String, SortedSet<String>> testMap = rq1.claimQueues(server.getServerName().getServerName());
        ReplicationQueues rq2 = ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
        rq2.init(s2.getServerName().toString());
        testMap = rq2.claimQueues(s1.getServerName().getServerName());
        ReplicationQueues rq3 = ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
        rq3.init(s3.getServerName().toString());
        testMap = rq3.claimQueues(s2.getServerName().getServerName());
        ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
        List<String> result = replicationQueueInfo.getDeadRegionServers();
        Assert.assertTrue((boolean)result.contains(server.getServerName().getServerName()));
        Assert.assertTrue((boolean)result.contains(s1.getServerName().getServerName()));
        Assert.assertTrue((boolean)result.contains(s2.getServerName().getServerName()));
        server.abort("", null);
    }

    @Test
    public void testFailoverDeadServerCversionChange() throws Exception {
        LOG.debug((Object)"testFailoverDeadServerCversionChange");
        conf.setBoolean("hbase.zookeeper.useMulti", true);
        DummyServer s0 = new DummyServer("cversion-change0.example.org");
        ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
        repQueues.init(s0.getServerName().toString());
        files.add("log1");
        files.add("log2");
        for (String file : files) {
            repQueues.addLog(slaveId, file);
        }
        DummyServer s1 = new DummyServer("cversion-change1.example.org");
        ReplicationQueues rq1 = ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
        rq1.init(s1.getServerName().toString());
        ReplicationQueuesClient client = ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
        int v0 = client.getQueuesZNodeCversion();
        rq1.claimQueues(s0.getServerName().getServerName());
        int v1 = client.getQueuesZNodeCversion();
        Assert.assertEquals((long)(v0 + 1), (long)v1);
        s0.abort("", null);
    }

    @Test
    public void testCleanupUnknownPeerZNode() throws Exception {
        DummyServer server = new DummyServer("hostname2.example.org");
        ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server);
        rq.init(server.getServerName().toString());
        String group = "testgroup";
        rq.addLog("2", group + ".log1");
        rq.addLog("2", group + ".log2");
        ReplicationSourceManager replicationSourceManager = manager;
        replicationSourceManager.getClass();
        ReplicationSourceManager.NodeFailoverWorker w1 = replicationSourceManager.new ReplicationSourceManager.NodeFailoverWorker(server.getServerName().getServerName());
        w1.run();
        for (String peer : manager.getAllQueues()) {
            Assert.assertTrue((boolean)peer.startsWith(slaveId));
        }
    }

    static {
        r1 = Bytes.toBytes("r1");
        r2 = Bytes.toBytes("r2");
        f1 = Bytes.toBytes("f1");
        test = TableName.valueOf("test");
        files = new ArrayList<String>();
    }

    static class DummyServer
    implements Server {
        String hostname;

        DummyServer() {
            this.hostname = "hostname.example.org";
        }

        DummyServer(String hostname) {
            this.hostname = hostname;
        }

        @Override
        public Configuration getConfiguration() {
            return conf;
        }

        @Override
        public ZooKeeperWatcher getZooKeeper() {
            return zkw;
        }

        @Override
        public CoordinatedStateManager getCoordinatedStateManager() {
            return null;
        }

        @Override
        public ClusterConnection getConnection() {
            return null;
        }

        @Override
        public MetaTableLocator getMetaTableLocator() {
            return null;
        }

        @Override
        public ServerName getServerName() {
            return ServerName.valueOf(this.hostname, 1234, 1L);
        }

        @Override
        public void abort(String why, Throwable e) {
        }

        @Override
        public boolean isAborted() {
            return false;
        }

        @Override
        public void stop(String why) {
        }

        @Override
        public boolean isStopped() {
            return false;
        }

        @Override
        public ChoreService getChoreService() {
            return null;
        }
    }

    static class DummyNodeFailoverWorker
    extends Thread {
        private SortedMap<String, SortedSet<String>> logZnodesMap;
        Server server;
        private String deadRsZnode;
        ReplicationQueues rq;

        public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
            this.deadRsZnode = znode;
            this.server = s;
            this.rq = ReplicationFactory.getReplicationQueues(this.server.getZooKeeper(), this.server.getConfiguration(), this.server);
            this.rq.init(this.server.getServerName().toString());
        }

        @Override
        public void run() {
            try {
                this.logZnodesMap = this.rq.claimQueues(this.deadRsZnode);
                this.server.abort("Done with testing", null);
            }
            catch (Exception e) {
                LOG.error((Object)"Got exception while running NodeFailoverWorker", (Throwable)e);
            }
            finally {
                latch.countDown();
            }
        }

        private int isLogZnodesMapPopulated() {
            Collection<SortedSet<String>> sets = this.logZnodesMap.values();
            if (sets.size() > 1) {
                throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
            }
            if (sets.size() == 1) {
                SortedSet<String> s = sets.iterator().next();
                for (String file : files) {
                    if (s.contains(file)) continue;
                    return 0;
                }
                return 1;
            }
            return 0;
        }
    }
}

