/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.verification.VerificationMode;

@Category(value={MediumTests.class})
public class TestReplicationSource {
    private static final Log LOG = LogFactory.getLog(TestReplicationSource.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility();
    private static FileSystem FS;
    private static Path oldLogDir;
    private static Path logDir;
    private static Configuration conf;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.startMiniDFSCluster(1);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        Path rootDir = TEST_UTIL.createRootDir();
        oldLogDir = new Path(rootDir, "oldWALs");
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        if (FS.exists(logDir = new Path(rootDir, "WALs"))) {
            FS.delete(logDir, true);
        }
    }

    @Before
    public void setup() throws IOException {
        if (!FS.exists(logDir)) {
            FS.mkdirs(logDir);
        }
        if (!FS.exists(oldLogDir)) {
            FS.mkdirs(oldLogDir);
        }
        TestReplicationEndpoint.ReplicationEndpointForTest.contructedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.startedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.replicateCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.stoppedCount.set(0);
        TestReplicationEndpoint.ReplicationEndpointForTest.lastEntries = null;
    }

    @After
    public void tearDown() throws IOException {
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        if (FS.exists(logDir)) {
            FS.delete(logDir, true);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL_PEER.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniDFSCluster();
    }

    @Test
    public void testLogMoving() throws Exception {
        Path logPath = new Path(logDir, "log");
        WALProvider.Writer writer = WALFactory.createWALWriter((FileSystem)FS, (Path)logPath, (Configuration)TEST_UTIL.getConfiguration());
        for (int i = 0; i < 3; ++i) {
            byte[] b = Bytes.toBytes((String)Integer.toString(i));
            KeyValue kv = new KeyValue(b, b, b);
            WALEdit edit = new WALEdit();
            edit.add((Cell)kv);
            WALKey key = new WALKey(b, TableName.valueOf((byte[])b), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            writer.append(new WAL.Entry(key, edit));
            writer.sync();
        }
        writer.close();
        WAL.Reader reader = WALFactory.createReader((FileSystem)FS, (Path)logPath, (Configuration)TEST_UTIL.getConfiguration());
        WAL.Entry entry = reader.next();
        Assert.assertNotNull((Object)entry);
        Path oldLogPath = new Path(oldLogDir, "log");
        FS.rename(logPath, oldLogPath);
        entry = reader.next();
        Assert.assertNotNull((Object)entry);
        entry = reader.next();
        entry = reader.next();
        Assert.assertNull((Object)entry);
        reader.close();
    }

    @Test
    public void testTerminateTimeout() throws Exception {
        final ReplicationSource source = new ReplicationSource();
        HBaseInterClusterReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint(){

            protected void doStart() {
                this.notifyStarted();
            }

            protected void doStop() {
            }
        };
        replicationEndpoint.start();
        ReplicationPeers mockPeers = (ReplicationPeers)Mockito.mock(ReplicationPeers.class);
        Configuration testConf = HBaseConfiguration.create();
        testConf.setInt("replication.source.maxretriesmultiplier", 1);
        source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, (ReplicationEndpoint)replicationEndpoint, null);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        final Future<?> future = executor.submit(new Runnable(){

            @Override
            public void run() {
                source.terminate("testing source termination");
            }
        });
        long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000L);
        Waiter.waitFor((Configuration)testConf, (long)(sleepForRetries * 2L), (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return future.isDone();
            }
        });
    }

    private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
        for (int i = 0; i < numEntries; ++i) {
            byte[] b = Bytes.toBytes((String)Integer.toString(i));
            KeyValue kv = new KeyValue(b, b, b);
            WALEdit edit = new WALEdit();
            edit.add((Cell)kv);
            WALKey key = new WALKey(b, TableName.valueOf((byte[])b), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
            scopes.put(b, 1);
            key.setScopes(scopes);
            writer.append(new WAL.Entry(key, edit));
            writer.sync();
        }
        writer.close();
    }

    private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOException {
        WAL.Reader reader = wals.createReader(FS, log2);
        for (int i = 0; i < numEntries; ++i) {
            reader.next();
        }
        return reader.getPosition();
    }

    @Test
    public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
        int numWALEntries = 5;
        conf.setInt("replication.source.nb.capacity", 5);
        Mocks mocks = new Mocks();
        final TestReplicationEndpoint.ReplicationEndpointForTest endpoint = new TestReplicationEndpoint.ReplicationEndpointForTest(){

            public WALEntryFilter getWALEntryfilter() {
                return null;
            }
        };
        WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path log1 = new Path(logDir, "log.1");
        Path log2 = new Path(logDir, "log.2");
        WALProvider.Writer writer1 = WALFactory.createWALWriter((FileSystem)FS, (Path)log1, (Configuration)TEST_UTIL.getConfiguration());
        WALProvider.Writer writer2 = WALFactory.createWALWriter((FileSystem)FS, (Path)log2, (Configuration)TEST_UTIL.getConfiguration());
        this.appendEntries(writer1, 3);
        this.appendEntries(writer2, 2);
        long pos = this.getPosition(wals, log2, 2);
        ReplicationSource source = mocks.createReplicationSourceWithMocks((ReplicationEndpoint)endpoint);
        source.run();
        source.enqueueLog(log1);
        source.enqueueLog(log2);
        Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return TestReplicationEndpoint.ReplicationEndpointForTest.replicateCount.get() > 0;
            }
        });
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager)Mockito.verify((Object)mocks.manager, (VerificationMode)VerificationModeFactory.times((int)1))).logPositionAndCleanOldLogs((Path)pathCaptor.capture(), Matchers.anyString(), ((Long)positionCaptor.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        Assert.assertTrue((TestReplicationEndpoint.ReplicationEndpointForTest.lastEntries.size() == 5 ? 1 : 0) != 0);
        MatcherAssert.assertThat((Object)((Path)pathCaptor.getValue()), (Matcher)CoreMatchers.is((Object)log2));
        MatcherAssert.assertThat((Object)((Long)positionCaptor.getValue()), (Matcher)CoreMatchers.is((Object)pos));
    }

    @Test
    public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception {
        Mocks mocks = new Mocks();
        TestReplicationEndpoint.ReplicationEndpointForTest endpoint = new TestReplicationEndpoint.ReplicationEndpointForTest();
        final ReplicationSource source = mocks.createReplicationSourceWithMocks((ReplicationEndpoint)endpoint);
        WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path log1 = new Path(logDir, "log.1");
        final Path log2 = new Path(logDir, "log.2");
        WALFactory.createWALWriter((FileSystem)FS, (Path)log1, (Configuration)TEST_UTIL.getConfiguration()).close();
        WALFactory.createWALWriter((FileSystem)FS, (Path)log2, (Configuration)TEST_UTIL.getConfiguration()).close();
        final long startPos = this.getPosition(wals, log2, 0);
        source.run();
        source.enqueueLog(log1);
        source.enqueueLog(log2);
        Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return log2.equals((Object)source.getLastLoggedPath()) && source.getLastLoggedPosition() >= startPos;
            }
        });
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager)Mockito.verify((Object)mocks.manager, (VerificationMode)VerificationModeFactory.times((int)1))).logPositionAndCleanOldLogs((Path)pathCaptor.capture(), Matchers.anyString(), ((Long)positionCaptor.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        MatcherAssert.assertThat((Object)((Path)pathCaptor.getValue()), (Matcher)CoreMatchers.is((Object)log2));
        MatcherAssert.assertThat((Object)((Long)positionCaptor.getValue()), (Matcher)CoreMatchers.is((Object)startPos));
    }

    @Test
    public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exception {
        Mocks mocks = new Mocks();
        TableName replicatedTable = TableName.valueOf((String)"replicated_table");
        Map cfs = Collections.singletonMap(replicatedTable, Collections.emptyList());
        Mockito.when((Object)mocks.peer.getTableCFs()).thenReturn(cfs);
        WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
        Path log1 = new Path(logDir, "log.1");
        final Path log2 = new Path(logDir, "log.2");
        WALProvider.Writer writer1 = WALFactory.createWALWriter((FileSystem)FS, (Path)log1, (Configuration)TEST_UTIL.getConfiguration());
        WALProvider.Writer writer2 = WALFactory.createWALWriter((FileSystem)FS, (Path)log2, (Configuration)TEST_UTIL.getConfiguration());
        this.appendEntries(writer1, 3);
        this.appendEntries(writer2, 2);
        final long pos = this.getPosition(wals, log2, 2);
        TestReplicationEndpoint.ReplicationEndpointForTest endpoint = new TestReplicationEndpoint.ReplicationEndpointForTest();
        final ReplicationSource source = mocks.createReplicationSourceWithMocks((ReplicationEndpoint)endpoint);
        source.enqueueLog(log1);
        source.enqueueLog(log2);
        source.run();
        Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return log2.equals((Object)source.getLastLoggedPath()) && source.getLastLoggedPosition() >= pos;
            }
        });
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class);
        ((ReplicationSourceManager)Mockito.verify((Object)mocks.manager, (VerificationMode)VerificationModeFactory.times((int)1))).logPositionAndCleanOldLogs((Path)pathCaptor.capture(), Matchers.anyString(), ((Long)positionCaptor.capture()).longValue(), Matchers.anyBoolean(), Matchers.anyBoolean());
        MatcherAssert.assertThat((Object)((Path)pathCaptor.getValue()), (Matcher)CoreMatchers.is((Object)log2));
        MatcherAssert.assertThat((Object)((Long)positionCaptor.getValue()), (Matcher)CoreMatchers.is((Object)pos));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerShutdownRecoveredQueue() throws Exception {
        try {
            conf.set("hbase.wal.provider", "defaultProvider");
            conf.setInt("replication.sleep.before.failover", 2000);
            conf.set("hbase.regionserver.impl", ShutdownDelayRegionServer.class.getName());
            MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
            TEST_UTIL_PEER.startMiniCluster(1);
            HRegionServer serverA = cluster.getRegionServer(0);
            final ReplicationSourceManager managerA = ((Replication)serverA.getReplicationSourceService()).getReplicationManager();
            HRegionServer serverB = cluster.getRegionServer(1);
            final ReplicationSourceManager managerB = ((Replication)serverB.getReplicationSourceService()).getReplicationManager();
            ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
            String peerId = "TestPeer";
            replicationAdmin.addPeer("TestPeer", new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null);
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return !managerA.getSources().isEmpty() && !managerB.getSources().isEmpty();
                }
            });
            replicationAdmin.disablePeer("TestPeer");
            cluster.stopRegionServer(serverA.getServerName());
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return managerB.getOldSources().size() == 1;
                }
            });
            final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
            serverC.waitForServerOnline();
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return serverC.getReplicationSourceService() != null;
                }
            });
            final ReplicationSourceManager managerC = ((Replication)serverC.getReplicationSourceService()).getReplicationManager();
            Assert.assertEquals((long)0L, (long)managerC.getOldSources().size());
            cluster.stopRegionServer(serverB.getServerName());
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return managerC.getOldSources().size() == 2;
                }
            });
            replicationAdmin.enablePeer("TestPeer");
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return managerC.getOldSources().size() == 0;
                }
            });
        }
        finally {
            conf.set("hbase.regionserver.impl", HRegionServer.class.getName());
        }
    }

    static {
        conf = TEST_UTIL.getConfiguration();
    }

    public static class ShutdownDelayRegionServer
    extends HRegionServer {
        public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
            super(conf);
        }

        public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException, InterruptedException {
            super(conf, csm);
        }

        protected void stopServiceThreads() {
            LOG.info((Object)"Adding a delay to the regionserver shutdown");
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException ex) {
                LOG.error((Object)"Interrupted while sleeping");
            }
            super.stopServiceThreads();
        }
    }

    private static final class Mocks {
        private final ReplicationSourceManager manager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
        private final ReplicationQueues queues = (ReplicationQueues)Mockito.mock(ReplicationQueues.class);
        private final ReplicationPeers peers = (ReplicationPeers)Mockito.mock(ReplicationPeers.class);
        private final MetricsSource metrics = (MetricsSource)Mockito.mock(MetricsSource.class);
        private final ReplicationPeer peer = (ReplicationPeer)Mockito.mock(ReplicationPeer.class);
        private final ReplicationEndpoint.Context context = (ReplicationEndpoint.Context)Mockito.mock(ReplicationEndpoint.Context.class);

        private Mocks() {
            Mockito.when((Object)this.peers.getStatusOfPeer(Matchers.anyString())).thenReturn((Object)true);
            Mockito.when((Object)this.context.getReplicationPeer()).thenReturn((Object)this.peer);
        }

        ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) throws IOException {
            ReplicationSource source = new ReplicationSource();
            endpoint.init(this.context);
            source.init(conf, FS, this.manager, this.queues, this.peers, (Stoppable)Mockito.mock(Stoppable.class), "testPeerClusterZnode", UUID.randomUUID(), endpoint, this.metrics);
            return source;
        }
    }
}

