/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.MapRDB;
import com.mapr.db.cdc.tests.TestCDPSBinaryWithCluster;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSOpenFormatBinaryWithCluster
extends BaseTest {
    private static long TestStartTime = System.currentTimeMillis();
    private static long NotValidTime = -1L;
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSOpenFormatBinaryWithCluster.class);
    private static final String ROW = "row";
    private static final String FAM = "fam";
    private static final String COL = "col";
    private static final int FAMCOUNT = 3;
    private static final int MAXVERSION = 5;
    private static int NumRows1 = 3;
    private static int NumRows2 = 5;
    private static int NumCols = 2;

    @BeforeClass
    public static void startupBeforeClass() throws Exception {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 10 minutes, please wait ---\nTestCDPSOpenFormatBinaryWithCluster#testBinaryConsumer ------ 10 minutes\n");
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        System.out.println("Done!");
    }

    public static void verifyId(String expectedRowstr, Value idVal) {
        _logger.info("get ID: " + idVal.asJsonString() + "\n");
        byte[] idValueBytes = idVal.getBinary().array();
        String idValueStr = new String(idValueBytes);
        if (!expectedRowstr.equals(idValueStr)) {
            _logger.error(expectedRowstr + ": wrong fieldValue " + idVal.asJsonString() + ", expect " + expectedRowstr + ", got " + idValueStr);
        }
    }

    public static void verifyOpTime(String rowstr, long opTime) {
        if (opTime <= TestStartTime) {
            _logger.error(rowstr + ": opTime " + opTime + " should be larger than test start time " + TestStartTime);
            Assert.assertTrue((boolean)false);
        }
    }

    public static void verifyOneMut(String rowstr, Value mut, String expectedPath, String expectedFieldOp, long expectedVersion, byte[] expectedValue) {
        ByteBuffer fvBuf;
        String fieldOp;
        _logger.info("get Mut: " + mut.asJsonString() + "\n");
        Map mutMap = mut.getMap();
        String fieldPath = (String)mutMap.get("$fieldPath");
        if (!expectedPath.equals(fieldPath)) {
            _logger.error(rowstr + ": Wrong fieldPath, expect " + expectedPath + ", got " + fieldPath);
            Assert.assertTrue((boolean)false);
        }
        if (!expectedFieldOp.equals(fieldOp = (String)mutMap.get("$fieldOp"))) {
            _logger.error(rowstr + ": Wrong fieldOp, expect " + expectedFieldOp + ", got " + fieldOp);
            Assert.assertTrue((boolean)false);
        }
        long fieldVersion = ((Double)mutMap.get("$fieldVersion")).longValue();
        if (expectedVersion == NotValidTime) {
            if (fieldVersion <= TestStartTime || fieldVersion >= Long.MAX_VALUE) {
                _logger.error(rowstr + ": fieldVersion " + fieldVersion + " should be larger than test start time, less than Long.MAX_VALUE " + TestStartTime);
                Assert.assertTrue((boolean)false);
            }
        } else if (fieldVersion != expectedVersion) {
            _logger.error(rowstr + ": wrong fieldVersion, expect " + expectedVersion + ", got " + fieldVersion);
            Assert.assertTrue((boolean)false);
        }
        if ((fvBuf = (ByteBuffer)mutMap.get("$fieldValue")) == null) {
            if (expectedValue != null) {
                _logger.info("wrong fieldValue, expect " + Bytes.toStringBinary((byte[])expectedValue) + ", get null ");
                Assert.assertTrue((boolean)false);
            }
        } else {
            byte[] fieldValue = fvBuf.array();
            if (expectedValue == null) {
                _logger.info("wrong fieldValue, expect null, get " + Bytes.toStringBinary((byte[])fieldValue));
                Assert.assertTrue((boolean)false);
            }
            if (!Arrays.equals(expectedValue, fieldValue)) {
                _logger.error(rowstr + ": wrong fieldValue " + (fieldValue == null ? "null" : Bytes.toStringBinary((byte[])fieldValue)) + ", expect " + (expectedValue == null ? "null" : Bytes.toStringBinary((byte[])expectedValue)) + ", got " + (fieldValue == null ? "null" : Bytes.toStringBinary((byte[])fieldValue)));
                Assert.assertTrue((boolean)false);
            }
        }
    }

    public static void verifyDeleteHead(String strRowId, Document doc) throws IOException {
        TestCDPSOpenFormatBinaryWithCluster.verifyHead(strRowId, "$RECORD_DELETE", doc);
    }

    public static void verifyUpdateHead(String strRowId, Document doc) throws IOException {
        TestCDPSOpenFormatBinaryWithCluster.verifyHead(strRowId, "$RECORD_UPDATE", doc);
    }

    public static void verifyHead(String strRowId, String RowOpType, Document doc) throws IOException {
        TestCDPSOpenFormatBinaryWithCluster.verifyHeadWithoutOpTime(strRowId, RowOpType, doc);
        long opTime = doc.getLong("$opTime");
        TestCDPSOpenFormatBinaryWithCluster.verifyOpTime(strRowId, opTime);
    }

    public static void verifyHeadWithoutOpTime(String strRowId, String RowOpType, Document doc) throws IOException {
        Value idVal = doc.getId();
        TestCDPSOpenFormatBinaryWithCluster.verifyId(strRowId, idVal);
        String opType = doc.getString("$opType");
        if (!RowOpType.equals(opType)) {
            _logger.error(strRowId + ": wrong opType, expect " + RowOpType + ", got " + opType);
            Assert.assertTrue((boolean)false);
        }
    }

    public static void verifyInitPuts(String strRowId, String cdr, boolean valIsString, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        int mutcount = mutObjs.size();
        int mutIdx = 0;
        Value oneMut = null;
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            for (int cidx = 0; cidx < NumCols; ++cidx) {
                String strcol = COL + cidx;
                String strval = null;
                byte[] byteVal = null;
                if (valIsString) {
                    strval = strRowId + "-" + strfam + strcol + "vCurrentTime";
                    byteVal = strval.getBytes();
                } else {
                    byteVal = Bytes.toBytes((long)1000L);
                }
                oneMut = (Value)mutObjs.get(mutIdx);
                TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, strfam + "." + strcol, "$PUT", NotValidTime, byteVal);
                ++mutIdx;
                for (long ts = 3L; ts >= 0L; --ts) {
                    byteVal = null;
                    if (valIsString) {
                        strval = strRowId + "-" + strfam + strcol + "v" + ts;
                        byteVal = strval.getBytes();
                    } else {
                        byteVal = Bytes.toBytes((long)ts);
                    }
                    oneMut = (Value)mutObjs.get(mutIdx);
                    TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, strfam + "." + strcol, "$PUT", ts, byteVal);
                    ++mutIdx;
                }
            }
        }
        if (3 * NumCols * 5 != mutcount) {
            _logger.error(strRowId + ": wrong number of mutations, expect " + 3 * NumCols + ", got " + mutIdx);
            Assert.assertTrue((boolean)false);
        }
    }

    public static void verifyInitPutsWithCols(String strRowId, String cdr, boolean valIsString, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        int mutcount = mutObjs.size();
        int mutIdx = 0;
        Value oneMut = null;
        ArrayList<boolean[]> replCFCols = new ArrayList<boolean[]>();
        boolean[] fam0cols = new boolean[]{false, false};
        replCFCols.add(0, fam0cols);
        boolean[] fam1cols = new boolean[]{false, true};
        replCFCols.add(1, fam1cols);
        boolean[] fam2cols = new boolean[]{true, true};
        replCFCols.add(2, fam2cols);
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            for (int cidx = 0; cidx < NumCols; ++cidx) {
                String strcol = COL + cidx;
                if (!((boolean[])replCFCols.get(fidx))[cidx]) continue;
                String strval = null;
                byte[] byteVal = null;
                if (valIsString) {
                    strval = strRowId + "-" + strfam + strcol + "vCurrentTime";
                    byteVal = strval.getBytes();
                } else {
                    byteVal = Bytes.toBytes((long)1000L);
                }
                oneMut = (Value)mutObjs.get(mutIdx);
                TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, strfam + "." + strcol, "$PUT", NotValidTime, byteVal);
                ++mutIdx;
                for (long ts = 3L; ts >= 0L; --ts) {
                    byteVal = null;
                    if (valIsString) {
                        strval = strRowId + "-" + strfam + strcol + "v" + ts;
                        byteVal = strval.getBytes();
                    } else {
                        byteVal = Bytes.toBytes((long)ts);
                    }
                    oneMut = (Value)mutObjs.get(mutIdx);
                    TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, strfam + "." + strcol, "$PUT", ts, byteVal);
                    ++mutIdx;
                }
            }
        }
        if (mutIdx != mutcount) {
            _logger.error(strRowId + ": wrong number of mutations, expect " + mutIdx + ", got " + mutcount);
            Assert.assertTrue((boolean)false);
        }
    }

    public static void verifyMutations(String strRowId, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        int mutcount = mutObjs.size();
        int mutIdx = 0;
        int cidx = 1;
        String strcol = COL + cidx;
        long ts3 = 3L;
        long ts6 = 6L;
        Value mut = null;
        for (int fidx = 0; fidx < 3; ++fidx) {
            String strfam = FAM + fidx;
            mut = (Value)mutObjs.get(mutIdx);
            TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, strfam + "." + strcol, "$DELETE_EXACT", ts3, null);
            mut = (Value)mutObjs.get(++mutIdx);
            String strval = "-mutated-value" + fidx + cidx;
            TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, strfam + "." + strcol, "$PUT", ts6, strval.getBytes());
        }
    }

    public static void verifyOnePut(String strRowId, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (1 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 1, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value mut = (Value)mutObjs.get(0);
        String strval = strRowId + "-fam0col0vCurrentTime2";
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, "fam0.col0", "$PUT", NotValidTime, strval.getBytes());
    }

    public static void verifyOnePutWithCol(String strRowId, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info("bucket repl: " + strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (2 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 2, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value mut = (Value)mutObjs.get(0);
        String strval = strRowId + "-fam1col1vCurrentTime2";
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, "fam1.col1", "$PUT", NotValidTime, strval.getBytes());
        mut = (Value)mutObjs.get(1);
        strval = strRowId + "-fam2col0vCurrentTime2";
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, "fam2.col0", "$PUT", NotValidTime, strval.getBytes());
    }

    public static void verifyGeneralPut(String strRowId, String cdr, String fam, String col, byte[] byteVal, long ts, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (1 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 1, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value oneMut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, fam + "." + col, "$PUT", NotValidTime, byteVal);
    }

    public static void verifyAllFamOrRow(String strRowId, String[] famcol, String op, long opTime, byte[] value, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (2 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 2, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value mut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, famcol[0], op, opTime, value);
        mut = (Value)mutObjs.get(1);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, famcol[1], op, opTime, value);
    }

    public static void verifyColOpWithCol(String strRowId, String op, long opTime, byte[] value, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (2 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 2, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value mut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, "fam1.col1", op, opTime, value);
        mut = (Value)mutObjs.get(1);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, "fam2.col1", op, opTime, value);
    }

    public static void verifyColOp(String strRowId, String fam, String col, String op, long opTime, byte[] value, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (1 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 1, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value oneMut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, fam + "." + col, op, opTime, value);
    }

    public static void verifyFamOpWithCol(String strRowId, String fam, String op, long opTime, byte[] value, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (1 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 1, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value mut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, mut, fam, op, opTime, value);
    }

    public static void verifyFamOp(String strRowId, String fam, String op, long opTime, byte[] value, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatBinaryWithCluster.verifyUpdateHead(strRowId, doc);
        List mutObjs = doc.getList("$mutations");
        if (1 != mutObjs.size()) {
            _logger.error(strRowId + ": wrong number of mutations, expect 1, got " + mutObjs.size());
            Assert.assertTrue((boolean)false);
        }
        Value oneMut = (Value)mutObjs.get(0);
        TestCDPSOpenFormatBinaryWithCluster.verifyOneMut(strRowId, oneMut, fam, op, opTime, value);
    }

    public static void verifyRowDelete(String strRowId, String cdr) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        TestCDPSOpenFormatBinaryWithCluster.verifyDeleteHead(strRowId, doc);
    }

    private void verifyInitCopy(List<ConsumerRecord<byte[], String>> getList, boolean isDebug) throws Exception {
        String strRowId;
        int ridx;
        ConsumerRecord<byte[], String> crec = null;
        String cdr = null;
        for (ridx = 0; ridx < NumRows1; ++ridx) {
            crec = getList.get(ridx);
            cdr = new String((String)crec.value());
            strRowId = ROW + ridx;
            TestCDPSOpenFormatBinaryWithCluster.verifyInitPuts(strRowId, cdr, true, isDebug);
        }
        for (ridx = NumRows1; ridx < NumRows2; ++ridx) {
            crec = getList.get(ridx);
            cdr = new String((String)crec.value());
            strRowId = ROW + ridx;
            TestCDPSOpenFormatBinaryWithCluster.verifyInitPuts(strRowId, cdr, false, isDebug);
        }
    }

    private void verifyBucketRepl(List<ConsumerRecord<byte[], String>> getList, boolean isDebug) throws Exception {
        ConsumerRecord<byte[], String> crec = null;
        String cdr = null;
        String strRowId = "row0";
        crec = getList.get(0);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyOnePut(strRowId, cdr, isDebug);
        crec = getList.get(1);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyColOp(strRowId, "fam0", "col1", "$DELETE_EXACT", NotValidTime, null, cdr, isDebug);
        crec = getList.get(2);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyColOp(strRowId, "fam0", "col0", "$DELETE_EXACT", 1L, null, cdr, isDebug);
        crec = getList.get(3);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyColOp(strRowId, "fam1", "col0", "$DELETE", NotValidTime, null, cdr, isDebug);
        crec = getList.get(4);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyColOp(strRowId, "fam1", "col1", "$DELETE", 2L, null, cdr, isDebug);
        crec = getList.get(5);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyFamOp(strRowId, "fam1", "$DELETE", NotValidTime, null, cdr, isDebug);
        crec = getList.get(6);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyFamOp(strRowId, "fam2", "$DELETE", 2L, null, cdr, isDebug);
        crec = getList.get(7);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyFamOp(strRowId, "fam2", "$DELETE", 3L, null, cdr, isDebug);
        crec = getList.get(8);
        cdr = new String((String)crec.value());
        strRowId = "row1";
        TestCDPSOpenFormatBinaryWithCluster.verifyRowDelete(strRowId, cdr);
        String strval = "row2-fam0col0vCurrentTime-append-value1";
        crec = getList.get(9);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyGeneralPut("row2", cdr, "fam0", "col0", strval.getBytes(), NotValidTime, isDebug);
        byte[] intbyte = Bytes.toBytes((long)1002L);
        crec = getList.get(10);
        cdr = new String((String)crec.value());
        TestCDPSOpenFormatBinaryWithCluster.verifyGeneralPut("row3", cdr, "fam0", "col0", intbyte, NotValidTime, isDebug);
        crec = getList.get(11);
        cdr = new String((String)crec.value());
    }

    @Test
    public void testBinaryConsumer() throws Exception {
        boolean isDebug = true;
        String BinTableName1 = "/tmp/btOpenFormatJson1";
        String Chglogdst1 = "/tmp/btChgOpenFormatJson1";
        String TopicFullName = Chglogdst1 + ":btOpenFormatJson1";
        long timeStartTest = System.nanoTime();
        TestCDPSBinaryWithCluster.createTable(BinTableName1);
        long timeDoneJsonSrc = System.nanoTime();
        TestCDPSUtil.replaceStreamTable(Chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(BinTableName1, Chglogdst1, TopicFullName, false, null);
        long timeDoneDst = System.nanoTime();
        List<ConsumerRecord<byte[], String>> getListBin = null;
        List<ConsumerRecord<byte[], String>> getListBinGrp = null;
        KafkaConsumer<byte[], String> consumerBin = null;
        KafkaConsumer<byte[], String> consumerBinGrp = null;
        consumerBin = TestCDPSUtil.startStringConsumer(TopicFullName, false, null);
        consumerBinGrp = TestCDPSUtil.startStringConsumer(TopicFullName, true, "cdcOfBin1");
        getListBin = TestCDPSUtil.fetchStringData(NumRows2, consumerBin);
        getListBinGrp = TestCDPSUtil.fetchStringData(NumRows2, consumerBinGrp);
        Assert.assertEquals((long)NumRows2, (long)getListBin.size());
        Assert.assertEquals((long)NumRows2, (long)getListBinGrp.size());
        long consumerGotInitData = System.nanoTime();
        this.verifyInitCopy(getListBin, true);
        this.verifyInitCopy(getListBinGrp, true);
        long consumerAnalyzeInitData = System.nanoTime();
        TestCDPSBinaryWithCluster.deleteRecFromTable(BinTableName1);
        long putMoreSrcData = System.nanoTime();
        getListBin = TestCDPSUtil.fetchStringDataWithBreak(12, consumerBin, 20);
        getListBinGrp = TestCDPSUtil.fetchStringDataWithBreak(12, consumerBinGrp, 20);
        Assert.assertEquals((long)12L, (long)getListBin.size());
        Assert.assertEquals((long)12L, (long)getListBinGrp.size());
        long consumerGotNewData = System.nanoTime();
        this.verifyBucketRepl(getListBin, true);
        this.verifyBucketRepl(getListBinGrp, true);
        long consumerAnalyzeMoreData = System.nanoTime();
        System.out.println("\nTime(ns) createSetLoadSrcTable start:" + timeStartTest + "\t end:" + timeDoneJsonSrc + "\t taken:" + (timeDoneJsonSrc - timeDoneJsonSrc) + " \n \nTime(ns) createSetupChangelog  start:" + timeDoneJsonSrc + "\t end:" + timeDoneDst + "\t taken:" + (timeDoneDst - timeDoneJsonSrc) + " \n \nTime(ns) copyRegionPhase       start:" + timeDoneDst + "\t end:" + consumerGotInitData + "\t taken:" + (consumerGotInitData - timeDoneDst) + " \n \nTime(ns) analyzeInitData       start:" + consumerGotInitData + "\t end:" + consumerAnalyzeInitData + "\t taken:" + (consumerAnalyzeInitData - consumerGotInitData) + " \n \nTime(ns) putMoreSrcData        start:" + consumerAnalyzeInitData + "\t end:" + putMoreSrcData + "\t taken:" + (putMoreSrcData - consumerAnalyzeInitData) + " \n \nTime(ns) bucketReplPhase       start:" + putMoreSrcData + "\t end:" + consumerGotNewData + "\t taken:" + (consumerGotNewData - putMoreSrcData) + " \n \nTime(ns) analyzeInitData       start:" + consumerGotNewData + "\t end:" + consumerAnalyzeMoreData + "\t taken:" + (consumerAnalyzeMoreData - consumerGotNewData) + " \n ");
    }

    @Test
    public void testReplWithColumns() throws Exception {
        String strRowId;
        int ridx;
        boolean isDebug = true;
        String tsrc1 = "/tmp/btOpenFormatJsonCol1";
        String chglogdst1 = "/tmp/btChgOpenFormatJsonCol1";
        String topicFullName = chglogdst1 + ":btOpenFormatJsonCol1";
        TestCDPSBinaryWithCluster.createTable(tsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        String replPaths = "fam1:col1,fam2";
        TestCDPSUtil.setupCDPSReplicaWithColumns(tsrc1, chglogdst1, topicFullName, false, replPaths);
        List<ConsumerRecord<byte[], String>> getListBin = null;
        ConsumerRecord<byte[], String> crec = null;
        KafkaConsumer<byte[], String> consumerBin = null;
        String cdr = null;
        consumerBin = TestCDPSUtil.startStringConsumer(topicFullName, false, null);
        getListBin = TestCDPSUtil.fetchStringData(NumRows2, consumerBin);
        Assert.assertEquals((long)NumRows2, (long)getListBin.size());
        for (ridx = 0; ridx < NumRows1; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (String)crec.value();
            strRowId = ROW + ridx;
            TestCDPSOpenFormatBinaryWithCluster.verifyInitPutsWithCols(strRowId, cdr, true, isDebug);
        }
        for (ridx = NumRows1; ridx < NumRows2; ++ridx) {
            crec = getListBin.get(ridx);
            cdr = (String)crec.value();
            strRowId = ROW + ridx;
            TestCDPSOpenFormatBinaryWithCluster.verifyInitPutsWithCols(strRowId, cdr, false, isDebug);
        }
        String strRowId2 = "row0";
        TestCDPSBinaryWithCluster.colOpOnTable(tsrc1);
        _logger.info("\n------ Done Col Ops, wait for the data been replicated --------\n");
        getListBin = TestCDPSUtil.fetchStringDataWithBreak(8, consumerBin, 20);
        Assert.assertEquals((long)8L, (long)getListBin.size());
        cdr = (String)getListBin.get(0).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyOnePutWithCol(strRowId2, cdr, isDebug);
        cdr = (String)getListBin.get(1).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyColOpWithCol(strRowId2, "$DELETE_EXACT", NotValidTime, null, cdr, isDebug);
        cdr = (String)getListBin.get(2).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyColOpWithCol(strRowId2, "$DELETE_EXACT", 1L, null, cdr, isDebug);
        cdr = (String)getListBin.get(3).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyColOpWithCol(strRowId2, "$DELETE", 2L, null, cdr, isDebug);
        cdr = (String)getListBin.get(4).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyColOpWithCol(strRowId2, "$DELETE", NotValidTime, null, cdr, isDebug);
        String[] fields = new String[]{"fam1.col1", "fam2"};
        cdr = (String)getListBin.get(5).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyAllFamOrRow(strRowId2, fields, "$DELETE", 3L, null, cdr, isDebug);
        cdr = (String)getListBin.get(6).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyAllFamOrRow(strRowId2, fields, "$DELETE", NotValidTime, null, cdr, isDebug);
        cdr = (String)getListBin.get(7).value();
        TestCDPSOpenFormatBinaryWithCluster.verifyAllFamOrRow(strRowId2, fields, "$DELETE", NotValidTime, null, cdr, isDebug);
    }
}

