/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestReplicationEndpoint
extends TestReplicationBase {
    static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
    static int numRegionServers;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestReplicationBase.setUpBeforeClass();
        utility2.shutdownMiniCluster();
        admin.removePeer("2");
        numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
        Assert.assertTrue((ReplicationEndpointForTest.stoppedCount.get() > 0 ? 1 : 0) != 0);
    }

    @Before
    public void setup() throws FailedLogCloseException, IOException {
        ReplicationEndpointForTest.contructedCount.set(0);
        ReplicationEndpointForTest.startedCount.set(0);
        ReplicationEndpointForTest.replicateCount.set(0);
        ReplicationEndpointReturningFalse.replicated.set(false);
        ReplicationEndpointForTest.lastEntries = null;
        for (JVMClusterUtil.RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
            utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
        }
    }

    @Test(timeout=120000L)
    public void testCustomReplicationEndpoint() throws Exception {
        admin.addPeer("testCustomReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
        Waiter.waitFor(conf1, 60000L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
            }
        });
        Waiter.waitFor(conf1, 60000L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
            }
        });
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        this.doPut(Bytes.toBytes("row42"));
        Waiter.waitFor(conf1, 60000L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        TestReplicationEndpoint.doAssert(Bytes.toBytes("row42"));
        admin.removePeer("testCustomReplicationEndpoint");
    }

    @Test(timeout=120000L)
    public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        Assert.assertTrue((!ReplicationEndpointReturningFalse.replicated.get() ? 1 : 0) != 0);
        int peerCount = admin.getPeersCount();
        String id = "testReplicationEndpointReturnsFalseOnReplicate";
        admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)).setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
        if (admin.getPeersCount() <= peerCount) {
            LOG.info((Object)("Waiting on peercount to go up from " + peerCount));
            Threads.sleep(100L);
        }
        this.doPut(row);
        Waiter.waitFor(conf1, 60000L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                int count = ReplicationEndpointForTest.replicateCount.get();
                LOG.info((Object)("count=" + count));
                return ReplicationEndpointReturningFalse.replicated.get();
            }
        });
        if (ReplicationEndpointReturningFalse.ex.get() != null) {
            throw ReplicationEndpointReturningFalse.ex.get();
        }
        admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
    }

    @Test(timeout=120000L)
    public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
        admin.addPeer("testWALEntryFilterFromReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)).setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
        try (Connection connection = ConnectionFactory.createConnection(conf1);){
            this.doPut(connection, Bytes.toBytes("row1"));
            this.doPut(connection, row);
            this.doPut(connection, Bytes.toBytes("row2"));
        }
        Waiter.waitFor(conf1, 60000L, new Waiter.Predicate<Exception>(){

            @Override
            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        Assert.assertNull((Object)ReplicationEndpointWithWALEntryFilter.ex.get());
        admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
    }

    private void doPut(byte[] row) throws IOException {
        try (Connection connection = ConnectionFactory.createConnection(conf1);){
            this.doPut(connection, row);
        }
    }

    private void doPut(Connection connection, byte[] row) throws IOException {
        try (Table t = connection.getTable(tableName);){
            Put put = new Put(row);
            put.add(famName, row, row);
            t.put(put);
        }
    }

    private static void doAssert(byte[] row) throws Exception {
        if (ReplicationEndpointForTest.lastEntries == null) {
            return;
        }
        Assert.assertEquals((long)1L, (long)ReplicationEndpointForTest.lastEntries.size());
        ArrayList<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
        Assert.assertEquals((long)1L, (long)cells.size());
        Assert.assertTrue((boolean)Bytes.equals(((Cell)cells.get(0)).getRowArray(), ((Cell)cells.get(0)).getRowOffset(), ((Cell)cells.get(0)).getRowLength(), row, 0, row.length));
    }

    public static class ReplicationEndpointWithWALEntryFilter
    extends ReplicationEndpointForTest {
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                super.replicate(replicateContext);
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            return true;
        }

        @Override
        public WALEntryFilter getWALEntryfilter() {
            return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter(){

                @Override
                public WAL.Entry filter(WAL.Entry entry) {
                    ArrayList<Cell> cells = entry.getEdit().getCells();
                    int size = cells.size();
                    for (int i = size - 1; i >= 0; --i) {
                        Cell cell = cells.get(i);
                        if (Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), TestReplicationBase.row, 0, TestReplicationBase.row.length)) continue;
                        cells.remove(i);
                    }
                    return entry;
                }
            });
        }
    }

    public static class ReplicationEndpointReturningFalse
    extends ReplicationEndpointForTest {
        static int COUNT = 10;
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);
        static AtomicBoolean replicated = new AtomicBoolean(false);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            super.replicate(replicateContext);
            LOG.info((Object)("Replicated " + TestReplicationBase.row + ", count=" + replicateCount.get()));
            replicated.set(replicateCount.get() > COUNT);
            return replicated.get();
        }
    }

    public static class ReplicationEndpointForTest
    extends BaseReplicationEndpoint {
        static UUID uuid = UUID.randomUUID();
        static AtomicInteger contructedCount = new AtomicInteger();
        static AtomicInteger startedCount = new AtomicInteger();
        static AtomicInteger stoppedCount = new AtomicInteger();
        static AtomicInteger replicateCount = new AtomicInteger();
        static volatile List<WAL.Entry> lastEntries = null;

        public ReplicationEndpointForTest() {
            contructedCount.incrementAndGet();
        }

        @Override
        public UUID getPeerUUID() {
            return uuid;
        }

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            replicateCount.incrementAndGet();
            lastEntries = replicateContext.entries;
            return true;
        }

        @Override
        protected void doStart() {
            startedCount.incrementAndGet();
            this.notifyStarted();
        }

        @Override
        protected void doStop() {
            stoppedCount.incrementAndGet();
            this.notifyStopped();
        }
    }
}

