/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category(value={MediumTests.class})
public class TestReplicationSource {
    private static final Log LOG = LogFactory.getLog(TestReplicationSource.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static FileSystem FS;
    private static Path oldLogDir;
    private static Path logDir;
    private static Configuration conf;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.startMiniDFSCluster(1);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        Path rootDir = TEST_UTIL.createRootDir();
        oldLogDir = new Path(rootDir, "oldWALs");
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        if (FS.exists(logDir = new Path(rootDir, "WALs"))) {
            FS.delete(logDir, true);
        }
    }

    @Test
    public void testLogMoving() throws Exception {
        Path logPath = new Path(logDir, "log");
        if (!FS.exists(logDir)) {
            FS.mkdirs(logDir);
        }
        if (!FS.exists(oldLogDir)) {
            FS.mkdirs(oldLogDir);
        }
        WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
        for (int i = 0; i < 3; ++i) {
            byte[] b = Bytes.toBytes(Integer.toString(i));
            KeyValue kv = new KeyValue(b, b, b);
            WALEdit edit = new WALEdit();
            edit.add(kv);
            WALKey key = new WALKey(b, TableName.valueOf(b), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            writer.append(new WAL.Entry(key, edit));
            writer.sync();
        }
        writer.close();
        WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
        WAL.Entry entry = reader.next();
        Assert.assertNotNull((Object)entry);
        Path oldLogPath = new Path(oldLogDir, "log");
        FS.rename(logPath, oldLogPath);
        entry = reader.next();
        Assert.assertNotNull((Object)entry);
        entry = reader.next();
        entry = reader.next();
        Assert.assertNull((Object)entry);
        reader.close();
    }

    @Test
    public void testTerminateTimeout() throws Exception {
        final ReplicationSource source = new ReplicationSource();
        HBaseInterClusterReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint(){

            @Override
            protected void doStart() {
                this.notifyStarted();
            }

            @Override
            protected void doStop() {
            }
        };
        replicationEndpoint.start();
        ReplicationPeers mockPeers = (ReplicationPeers)Mockito.mock(ReplicationPeers.class);
        Configuration testConf = HBaseConfiguration.create();
        testConf.setInt("replication.source.maxretriesmultiplier", 1);
        source.init(testConf, null, null, null, mockPeers, null, "testPeer", null, replicationEndpoint, null);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        final Future<?> future = executor.submit(new Runnable(){

            @Override
            public void run() {
                source.terminate("testing source termination");
            }
        });
        long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000L);
        Waiter.waitFor(testConf, sleepForRetries * 2L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return future.isDone();
            }
        });
    }

    static {
        conf = HBaseConfiguration.create();
    }
}

