/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.tests.utils.DBTests;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.KeyValue;
import org.ojai.store.cdc.ChangeDataReader;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSBinaryWithCluster
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSBinaryWithCluster.class);
    private static String BinTableName1 = "/tmp/bsrc1";
    private static String Chglogdst1 = "/tmp/chglogdstbin1";
    private static String TopicFullName = Chglogdst1 + ":bsrc1";
    private static final String ROW = "row";
    private static final String FAM = "fam";
    private static final String COL = "col";
    private static final int FAMCOUNT = 3;
    private static final int MAXVERSION = 5;
    private static int NumRows1 = 3;
    private static int NumRows2 = 5;
    private static int NumCols = 2;

    public static void createTable(String tableName) throws Exception {
        String cmd = " hbase com.mapr.fs.hbase.test.TestDeletes " + tableName + " init";
        System.out.println("\n" + cmd);
        DBTests.ExecuteShellCmd((String)cmd);
    }

    public static void deleteRecFromTable(String tableName) throws Exception {
        String cmd = " hbase com.mapr.fs.hbase.test.TestDeletes " + tableName + " delete";
        System.out.println("\n" + cmd);
        DBTests.ExecuteShellCmd((String)cmd);
    }

    public static void colOpOnTable(String tableName) throws Exception {
        String cmd = " hbase com.mapr.fs.hbase.test.TestDeletes " + tableName + " colop";
        System.out.println("\n" + cmd);
        DBTests.ExecuteShellCmd((String)cmd);
    }

    @BeforeClass
    public static void startupBeforeClass() throws Exception {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 3 minutes, please wait ---\nTestCDPSBinaryWithCluster#testBinaryConsumer ------ 10 minutes\n");
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        System.out.println("Done!");
    }

    public static void verifyInitPutsThroughIterWithCols1(String strRowId, ChangeDataRecord cdr, boolean valIsString) throws IOException {
        Iterator cdrItr = cdr.iterator();
        ArrayList<boolean[]> replCFCols = new ArrayList<boolean[]>();
        boolean[] fam0cols = new boolean[]{false, false};
        replCFCols.add(0, fam0cols);
        boolean[] fam1cols = new boolean[]{false, true};
        replCFCols.add(1, fam1cols);
        boolean[] fam2cols = new boolean[]{true, true};
        replCFCols.add(2, fam2cols);
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            for (int cidx = 0; cidx < NumCols; ++cidx) {
                String strcol = COL + cidx;
                if (!((boolean[])replCFCols.get(fidx))[cidx]) continue;
                String strval = null;
                ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
                byte[] byteVal = null;
                if (valIsString) {
                    strval = strRowId + "-" + strfam + strcol + "vCurrentTime";
                    byteVal = strval.getBytes();
                } else {
                    byteVal = Bytes.toBytes((long)1000L);
                }
                TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, strfam + "." + strcol, byteVal);
                for (long ts = 3L; ts >= 0L; --ts) {
                    cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
                    byteVal = null;
                    if (valIsString) {
                        strval = strRowId + "-" + strfam + strcol + "v" + ts;
                        byteVal = strval.getBytes();
                    } else {
                        byteVal = Bytes.toBytes((long)ts);
                    }
                    TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts, strfam + "." + strcol, byteVal);
                }
            }
        }
    }

    public static void verifyInitPutsThroughIter1(String strRowId, ChangeDataRecord cdr, boolean valIsString) throws IOException {
        Iterator cdrItr = cdr.iterator();
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            for (int cidx = 0; cidx < NumCols; ++cidx) {
                String strcol = COL + cidx;
                String strval = null;
                ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
                byte[] byteVal = null;
                if (valIsString) {
                    strval = strRowId + "-" + strfam + strcol + "vCurrentTime";
                    byteVal = strval.getBytes();
                } else {
                    byteVal = Bytes.toBytes((long)1000L);
                }
                TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, strfam + "." + strcol, byteVal);
                for (long ts = 3L; ts >= 0L; --ts) {
                    cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
                    byteVal = null;
                    if (valIsString) {
                        strval = strRowId + "-" + strfam + strcol + "v" + ts;
                        byteVal = strval.getBytes();
                    } else {
                        byteVal = Bytes.toBytes((long)ts);
                    }
                    TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts, strfam + "." + strcol, byteVal);
                }
            }
        }
    }

    public static void verifyOneFamOpThroughReader1(String strRowId, String fam, ChangeOp op, long opTime, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        ChangeDataReader reader = cdr.getReader();
        long opServerTime = cdr.getServerTimestamp();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opServerTime, null, null);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, op, opTime, fam, value);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opServerTime, null, null);
    }

    public static void verifyOneOpThroughReader1(String strRowId, String fam, String col, ChangeOp op, long opTime, TestCDPSUtil.OpTimeVerifyMethod opMethod, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        ChangeDataReader reader = cdr.getReader();
        long opServerTime = cdr.getServerTimestamp();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opServerTime, null, null);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opServerTime, fam, null);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEqualsBinary(cd, opMethod, ChangeEvent.NODE, op, opTime, col, value);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opServerTime, fam, null);
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opServerTime, null, null);
    }

    public static void verifyInitPutsThroughReader1(String strRowId, ChangeDataRecord cdr, boolean valIsString, boolean isDebug) throws IOException {
        ChangeDataReader reader = cdr.getReader();
        long opTime = cdr.getOpTimestamp();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opTime, null, null);
        String strval = null;
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opTime, strfam, null);
            for (int cidx = 0; cidx < NumCols; ++cidx) {
                String strcol = COL + cidx;
                byte[] byteVal = null;
                if (valIsString) {
                    strval = strRowId + "-" + strfam + strcol + "vCurrentTime";
                    byteVal = strval.getBytes();
                } else {
                    byteVal = Bytes.toBytes((long)1000L);
                }
                cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
                TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, strcol, byteVal);
                for (long ts = 3L; ts >= 0L; --ts) {
                    if (valIsString) {
                        strval = strRowId + "-" + strfam + strcol + "v" + ts;
                        byteVal = strval.getBytes();
                    } else {
                        byteVal = Bytes.toBytes((long)ts);
                    }
                    cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
                    TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts, strcol, byteVal);
                }
            }
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opTime, strfam, null);
        }
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opTime, null, null);
    }

    public static void verifyInitPuts(String strRowId, ChangeDataRecord cdr, boolean valIsStr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.SET);
        TestCDPSBinaryWithCluster.verifyInitPutsThroughIter1(strRowId, cdr, valIsStr);
        TestCDPSBinaryWithCluster.verifyInitPutsThroughReader1(strRowId, cdr, valIsStr, isDebug);
    }

    public static void verifyInitPutWithCols(String strRowId, ChangeDataRecord cdr, boolean valIsStr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.SET);
        TestCDPSBinaryWithCluster.verifyInitPutsThroughIterWithCols1(strRowId, cdr, valIsStr);
    }

    public static void verifyMutations(String strRowId, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        int cidx = 1;
        String strcol = COL + cidx;
        long ts3 = 3L;
        long ts6 = 6L;
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
            TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE_EXACT, ts3, strfam + "." + strcol, null);
            cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
            String strval = "-mutated-value" + fidx + cidx;
            TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts6, strfam + "." + strcol, strval.getBytes());
        }
        ChangeDataReader reader = cdr.getReader();
        long opServerTime = cdr.getServerTimestamp();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opServerTime, null, null);
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opServerTime, strfam, null);
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE_EXACT, ts3, strcol, null);
            String strval = "-mutated-value" + fidx + cidx;
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts6, strcol, strval.getBytes());
            cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
            TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opServerTime, strfam, null);
        }
        cd = TestCDPSUtil.moveToNextNodeReaderBinary(reader);
        TestCDPSUtil.assertNodeEquals(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opServerTime, null, null);
    }

    public static void verifyOnePut(String strRowId, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        String strval = strRowId + "-fam0col0vCurrentTime2";
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, cdr.getOpTimestamp(), "fam0.col0", strval.getBytes());
        TestCDPSBinaryWithCluster.verifyOneOpThroughReader1(strRowId, "fam0", "col0", ChangeOp.PUT, cdr.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, "row0-fam0col0vCurrentTime2".getBytes(), cdr, isDebug);
    }

    public static void verifyOnePutWithCol(String strRowId, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        String strval = strRowId + "-fam1col1vCurrentTime2";
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, cdr.getOpTimestamp(), "fam1.col1", strval.getBytes());
        strval = strRowId + "-fam2col0vCurrentTime2";
        cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, cdr.getOpTimestamp(), "fam2.col0", strval.getBytes());
    }

    public static void verifyGeneralPut(String strRowId, ChangeDataRecord cdr, String fam, String col, byte[] byteVal, long ts, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, ts, fam + "." + col, byteVal);
        TestCDPSBinaryWithCluster.verifyOneOpThroughReader1(strRowId, fam, col, ChangeOp.PUT, ts, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, byteVal, cdr, isDebug);
    }

    public static void verifyColOpWithCol(String strRowId, String fam, String col, ChangeOp op, long opTime, TestCDPSUtil.OpTimeVerifyMethod opMethod, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, opMethod, ChangeEvent.NODE, op, opTime, "fam1.col1", value);
        cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, opMethod, ChangeEvent.NODE, op, opTime, "fam2.col1", value);
    }

    public static void verifyColOp(String strRowId, String fam, String col, ChangeOp op, long opTime, TestCDPSUtil.OpTimeVerifyMethod opMethod, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        TestCDPSBinaryWithCluster.verifyOneOpThroughReader1(strRowId, fam, col, op, opTime, opMethod, value, cdr, isDebug);
        Iterator cdrItr = cdr.iterator();
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, opMethod, ChangeEvent.NODE, op, opTime, fam + "." + col, value);
    }

    public static void verifyFamOpWithCol(String strRowId, String fam, ChangeOp op, long opTime, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator cdrItr = cdr.iterator();
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, op, opTime, "fam1", value);
        cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, op, opTime, "fam2", value);
    }

    public static void verifyFamOp(String strRowId, String fam, ChangeOp op, long opTime, byte[] value, ChangeDataRecord cdr, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        TestCDPSBinaryWithCluster.verifyOneFamOpThroughReader1(strRowId, fam, op, opTime, null, cdr, isDebug);
        Iterator cdrItr = cdr.iterator();
        ChangeNode cd = (ChangeNode)((KeyValue)cdrItr.next()).getValue();
        TestCDPSUtil.assertNodeEqualsBinary(cd, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, op, opTime, fam, value);
    }

    @Test
    public void testBinaryConsumer() throws Exception {
        String strRowId;
        int ridx;
        boolean isDebug = false;
        long timeStartTest = System.nanoTime();
        TestCDPSBinaryWithCluster.createTable(BinTableName1);
        long timeDoneJsonSrc = System.nanoTime();
        TestCDPSUtil.replaceStreamTable(Chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(BinTableName1, Chglogdst1, TopicFullName, false, null);
        long timeDoneDst = System.nanoTime();
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListBin = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerBin = null;
        ChangeDataRecord cdr = null;
        consumerBin = TestCDPSUtil.startConsumer(TopicFullName);
        getListBin = TestCDPSUtil.fetchChangeDataWithBreak(NumRows2, consumerBin, 60);
        Assert.assertEquals((long)NumRows2, (long)getListBin.size());
        long consumerGotInitData = System.nanoTime();
        for (ridx = 0; ridx < NumRows1; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (ChangeDataRecord)crec.value();
            strRowId = ROW + ridx;
            System.out.println("\n------ " + strRowId + "--------\n");
            TestCDPSUtil.printChangeRecHeader(cdr);
            TestCDPSUtil.printChangeDataThroughIter(cdr);
            TestCDPSUtil.printChangeDataThroughReaderBinary(cdr);
            TestCDPSBinaryWithCluster.verifyInitPuts(strRowId, cdr, true, isDebug);
        }
        for (ridx = NumRows1; ridx < NumRows2; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (ChangeDataRecord)crec.value();
            strRowId = ROW + ridx;
            System.out.println("\n------ " + strRowId + "--------\n");
            TestCDPSUtil.printChangeRecHeader(cdr);
            TestCDPSUtil.printChangeDataThroughIter(cdr);
            TestCDPSUtil.printChangeDataThroughReaderBinary(cdr);
            TestCDPSBinaryWithCluster.verifyInitPuts(strRowId, cdr, false, isDebug);
        }
        long consumerAnalyzeInitData = System.nanoTime();
        String strRowId2 = "row0";
        TestCDPSBinaryWithCluster.deleteRecFromTable(BinTableName1);
        long putMoreSrcData = System.nanoTime();
        getListBin = TestCDPSUtil.fetchChangeDataWithBreak(12, consumerBin, 20);
        long consumerGotNewData = System.nanoTime();
        cdr = (ChangeDataRecord)getListBin.get(0).value();
        TestCDPSBinaryWithCluster.verifyOnePut(strRowId2, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(1).value();
        TestCDPSBinaryWithCluster.verifyColOp(strRowId2, "fam0", "col1", ChangeOp.DELETE_EXACT, cdr.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.LessEqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(2).value();
        TestCDPSBinaryWithCluster.verifyColOp(strRowId2, "fam0", "col0", ChangeOp.DELETE_EXACT, 1L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(3).value();
        TestCDPSBinaryWithCluster.verifyColOp(strRowId2, "fam1", "col0", ChangeOp.DELETE, cdr.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(4).value();
        TestCDPSBinaryWithCluster.verifyColOp(strRowId2, "fam1", "col1", ChangeOp.DELETE, 2L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(5).value();
        TestCDPSBinaryWithCluster.verifyFamOp(strRowId2, "fam1", ChangeOp.DELETE, cdr.getOpTimestamp(), null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(6).value();
        TestCDPSBinaryWithCluster.verifyFamOp(strRowId2, "fam2", ChangeOp.DELETE, 2L, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(7).value();
        TestCDPSBinaryWithCluster.verifyFamOp(strRowId2, "fam2", ChangeOp.DELETE, 3L, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(8).value();
        strRowId2 = "row1";
        TestCDPSUtil.verifyRowDeleteBinary(strRowId2, cdr);
        String strval = "row2-fam0col0vCurrentTime-append-value1";
        cdr = (ChangeDataRecord)getListBin.get(9).value();
        TestCDPSBinaryWithCluster.verifyGeneralPut("row2", cdr, "fam0", "col0", strval.getBytes(), cdr.getOpTimestamp(), isDebug);
        byte[] intbyte = Bytes.toBytes((long)1002L);
        cdr = (ChangeDataRecord)getListBin.get(10).value();
        TestCDPSBinaryWithCluster.verifyGeneralPut("row3", cdr, "fam0", "col0", intbyte, cdr.getOpTimestamp(), isDebug);
        cdr = (ChangeDataRecord)getListBin.get(11).value();
        TestCDPSBinaryWithCluster.verifyMutations("row2", cdr, isDebug);
        long consumerAnalyzeMoreData = System.nanoTime();
        System.out.println("\nTime(ns) createSetLoadSrcTable start:" + timeStartTest + "\t end:" + timeDoneJsonSrc + "\t taken:" + (timeDoneJsonSrc - timeDoneJsonSrc) + " \n \nTime(ns) createSetupChangelog  start:" + timeDoneJsonSrc + "\t end:" + timeDoneDst + "\t taken:" + (timeDoneDst - timeDoneJsonSrc) + " \n \nTime(ns) copyRegionPhase       start:" + timeDoneDst + "\t end:" + consumerGotInitData + "\t taken:" + (consumerGotInitData - timeDoneDst) + " \n \nTime(ns) analyzeInitData       start:" + consumerGotInitData + "\t end:" + consumerAnalyzeInitData + "\t taken:" + (consumerAnalyzeInitData - consumerGotInitData) + " \n \nTime(ns) putMoreSrcData        start:" + consumerAnalyzeInitData + "\t end:" + putMoreSrcData + "\t taken:" + (putMoreSrcData - consumerAnalyzeInitData) + " \n \nTime(ns) bucketReplPhase       start:" + putMoreSrcData + "\t end:" + consumerGotNewData + "\t taken:" + (consumerGotNewData - putMoreSrcData) + " \n \nTime(ns) analyzeInitData       start:" + consumerGotNewData + "\t end:" + consumerAnalyzeMoreData + "\t taken:" + (consumerAnalyzeMoreData - consumerGotNewData) + " \n ");
    }

    @Test
    public void testReplWithColumns() throws Exception {
        String strRowId;
        int ridx;
        boolean isDebug = false;
        String tsrc1 = "/tmp/tcolsrc1";
        String chglogdst1 = "/tmp/chglogcoldstbin1";
        String topicFullName = chglogdst1 + ":tcolsrc1";
        TestCDPSBinaryWithCluster.createTable(tsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        String replPaths = "fam1:col1,fam2";
        TestCDPSUtil.setupCDPSReplicaWithColumns(tsrc1, chglogdst1, topicFullName, false, replPaths);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListBin = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerBin = null;
        ChangeDataRecord cdr = null;
        consumerBin = TestCDPSUtil.startConsumer(topicFullName);
        getListBin = TestCDPSUtil.fetchChangeDataWithBreak(NumRows2, consumerBin, 60);
        Assert.assertEquals((long)NumRows2, (long)getListBin.size());
        for (ridx = 0; ridx < NumRows1; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (ChangeDataRecord)crec.value();
            strRowId = ROW + ridx;
            System.out.println("\n------ " + strRowId + "--------\n");
            TestCDPSUtil.printChangeRecHeader(cdr);
            TestCDPSUtil.printChangeDataThroughIter(cdr);
            TestCDPSUtil.printChangeDataThroughReaderBinary(cdr);
            TestCDPSBinaryWithCluster.verifyInitPutWithCols(strRowId, cdr, true, isDebug);
        }
        for (ridx = NumRows1; ridx < NumRows2; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (ChangeDataRecord)crec.value();
            strRowId = ROW + ridx;
            System.out.println("\n------ " + strRowId + "--------\n");
            TestCDPSUtil.printChangeRecHeader(cdr);
            TestCDPSUtil.printChangeDataThroughIter(cdr);
            TestCDPSUtil.printChangeDataThroughReaderBinary(cdr);
            TestCDPSBinaryWithCluster.verifyInitPutWithCols(strRowId, cdr, false, isDebug);
        }
        String strRowId2 = "row0";
        TestCDPSBinaryWithCluster.colOpOnTable(tsrc1);
        System.out.println("\n------ Done Col Ops, wait for the data been replicated --------\n");
        getListBin = TestCDPSUtil.fetchChangeDataWithBreak(7, consumerBin, 20);
        Assert.assertEquals((long)7L, (long)getListBin.size());
        cdr = (ChangeDataRecord)getListBin.get(0).value();
        TestCDPSBinaryWithCluster.verifyOnePutWithCol(strRowId2, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(1).value();
        TestCDPSBinaryWithCluster.verifyColOpWithCol(strRowId2, null, null, ChangeOp.DELETE_EXACT, cdr.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.LessEqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(2).value();
        TestCDPSBinaryWithCluster.verifyColOpWithCol(strRowId2, null, null, ChangeOp.DELETE_EXACT, 1L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(3).value();
        TestCDPSBinaryWithCluster.verifyColOpWithCol(strRowId2, null, null, ChangeOp.DELETE, 2L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(4).value();
        TestCDPSBinaryWithCluster.verifyColOpWithCol(strRowId2, null, null, ChangeOp.DELETE, cdr.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(5).value();
        TestCDPSBinaryWithCluster.verifyFamOpWithCol(strRowId2, null, ChangeOp.DELETE, 3L, null, cdr, isDebug);
        cdr = (ChangeDataRecord)getListBin.get(6).value();
        TestCDPSBinaryWithCluster.verifyFamOpWithCol(strRowId2, null, ChangeOp.DELETE, cdr.getOpTimestamp(), null, cdr, isDebug);
    }
}

