/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.JsonTable;
import com.mapr.db.MapRDB;
import com.mapr.db.Table;
import com.mapr.db.cdc.tests.TestCDPSCLIWithCluster;
import com.mapr.db.cdc.tests.TestCDPSJSONWithCluster;
import com.mapr.db.cdc.tests.TestCDPSOpenFormatBinaryWithCluster;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.rowcol.KeyValue;
import com.mapr.db.rowcol.KeyValueBuilder;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.Value;
import org.ojai.store.DocumentMutation;
import org.ojai.store.DocumentStore;
import org.ojai.store.cdc.ChangeDataRecordType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSOpenFormatJSONWithCluster
extends BaseTest {
    private static long TestStartTime = System.currentTimeMillis();
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSOpenFormatJSONWithCluster.class);
    private static final String SRC_MCF_TABLE_NAME = "/tmp/jtOpenFormatJsonMcf";
    private static final String SRC_SCF_TABLE_NAME = "/tmp/jtOpenFormatJsonScf";
    private static final String DST_TABLE_NAME = "/tmp/chgOpenFormatJson";
    private static final String DST_MCF_TOPIC_NAME = "jtOpenFormatJsonMcf";
    private static final String DST_SCF_TOPIC_NAME = "jtOpenFormatJsonScf";
    private static final String ROWID_PREFIX = "row";
    public static final String CONNECTION_URL = "ojai:mapr:";
    private static AdminImpl testAdmin = null;
    private static int initMapNum = 7;
    private static int initMCFNum = 10;
    private static int initRowNum = 20;
    private static int bucketPutStartRow = 1;

    public static Document putDataWithAllTypes(String rowid, Table jsonTable) throws IOException {
        Document rec = TestCDPSUtil.getAllTypeRecord();
        jsonTable.insertOrReplace(rowid, rec);
        return rec;
    }

    public static Document putBinaryRow(byte[] rowid, Table jsonTable) throws IOException {
        ByteBuffer bbuf = ByteBuffer.allocate(256);
        for (int i = 0; i < bbuf.capacity(); ++i) {
            bbuf.put((byte)i);
        }
        bbuf.rewind();
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("bytes", bbuf);
        _logger.info("SUUU add byte rec:" + rec.asJsonString());
        ByteBuffer bbRowid = ByteBuffer.wrap(rowid);
        KeyValue rowkey = KeyValueBuilder.initFrom((ByteBuffer)bbRowid);
        jsonTable.insertOrReplace((Value)rowkey, (Document)rec);
        return rec;
    }

    @BeforeClass
    public static void startupBeforeClass() throws IOException {
        testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        testAdmin.close();
        _logger.info("Done!");
    }

    public static String getRecHead(String strRowId, ChangeDataRecordType recType) {
        return "{\"_id\":\"" + strRowId + "\",\"$opType\":\"$" + recType.toString() + "\",\"$opTime\":";
    }

    public static String getOneMutFieldDelete(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$DELETE\"}]}";
    }

    public static String getOneMutReplaceCFMap(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fRep1\":\"replacedCF1\",\"fRep2\":{\"$numberInt\":400}}}]}";
    }

    public static String getOneMutInsertCFMap(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fIns1\":\"insertedCF1\",\"fIns2\":{\"$numberInt\":500}}}]}";
    }

    public static String getOneMutReplaceSimpleArray(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"replacedItem1\",\"replacedItem2\",\"replacedItem3\"]}]}";
    }

    public static String getOneMutInsertSimpleArray(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"insertedItem1\",\"insertedItem2\",\"insertedItem3\"]}]}";
    }

    public static String getOneMutReplaceSimpleField(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":\"replacedLeafNode\"}]}";
    }

    public static String getOneMutInsertSimpleField(String fieldPath) {
        return ",\"$mutations\":[{\"$fieldPath\":\"" + fieldPath + "\",\"$fieldOp\":\"$SET\",\"$fieldValue\":\"insertedLeafNode\"}]}";
    }

    public static void verifyData(String recPrefix, String recSuffix, String cdr) {
        boolean prefixMatch = cdr.startsWith(recPrefix);
        boolean sufixMatch = cdr.endsWith(recSuffix);
        if (!prefixMatch) {
            _logger.error("head Mismatch! expect:\n" + recPrefix + ", get rec:\n" + cdr);
        }
        if (!sufixMatch) {
            _logger.error("body Mismatch! expect:\n" + recSuffix + ", get rec:\n" + cdr);
        }
        Assert.assertTrue((boolean)prefixMatch);
        Assert.assertTrue((boolean)sufixMatch);
    }

    public static void verifyDataWithAllTypes(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"Date\":{\"$dateDay\":\"1970-01-05\"},\"Friends\":[\"Anurag\",\"Bharat\",{\"$numberInt\":10}],\"NAME\":\"ANURAG\",\"NULL\":null,\"Scores\":[{\"$numberInt\":10},{\"$numberInt\":20},{\"$numberInt\":30}],\"Time\":{\"$time\":\"19:46:40.000\"},\"binary3\":{\"$binary\":\"AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiYw==\"},\"boolean\":false,\"byte\":{\"$numberByte\":100},\"escapestring\":\"escape\\b\\f\\n\\r\\t\\\"\\\\string\",\"map\":{\"Array2\":[-2321232.1234312,{\"$numberLong\":-50000},{\"$numberInt\":10}],\"LIST\":[\"Field1\",{\"$numberInt\":500},5555.5555],\"LIST2\":[\"Field1\",{\"$numberInt\":500},5555.5555,[{\"$numberInt\":500},{\"$numberInt\":1000},{\"$numberInt\":1500},{\"$numberInt\":2000}]],\"boolarray\":[false,true,true],\"boolean\":true,\"byte\":{\"$numberByte\":100},\"double1A\":10.12345,\"double1B\":-3.793147,\"double2A\":10.1234567890123,\"double2B\":-3.79314716921907,\"double3A\":10.1234567890123,\"double3B\":-3.79314716921907,\"double4A\":10.1234567890123,\"double4B\":-3.79314716921907,\"float1A\":{\"$numberFloat\":10.1234},\"float1B\":{\"$numberFloat\":-1.358},\"float2A\":{\"$numberFloat\":10.1235},\"float2B\":{\"$numberFloat\":-1.35888},\"float3A\":{\"$numberFloat\":10.1235},\"float3B\":{\"$numberFloat\":-1.35888},\"float4A\":{\"$numberFloat\":10.1235},\"float4B\":{\"$numberFloat\":-1.35888},\"int\":{\"$numberInt\":50000},\"long\":{\"$numberLong\":12345678999},\"negdate\":{\"$dateDay\":\"1018-03-26\"},\"negtimestamp\":{\"$date\":\"1016-02-25T08:02:03.004Z\"},\"posdate\":{\"$dateDay\":\"2018-03-26\"},\"postime\":{\"$time\":\"15:16:17.018\"},\"postimestamp\":{\"$date\":\"2017-02-25T08:02:03.004Z\"},\"short\":{\"$numberShort\":10000},\"string\":\"string\"},\"string\":\"stringstrinstringstring\"}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayWithAllTypes(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"ArrayInArray\":[[\"student1\"],[\"G1\",\"G2\"],[{\"$numberInt\":91},{\"$numberInt\":92},{\"$numberInt\":93}]],\"arrayF\":[[\"fieldValue1\"],{\"L1MapF1\":\"L1MapF1Value1\",\"L1MapF2\":\"L1MapF2Value2\"},[{\"L2MapF11\":\"L2MapF11Value11\"},{\"L2MapF21\":\"L2MapF21Value21\",\"L2MapF22\":\"L2MapF21Value22\"}],[[\"L3ArrayValue111\"],[\"L3ArrayValue121\"],[\"L3ArrayValue131\",\"L3ArrayValue132\"]]],\"mapB\":{\"mapC\":{\"SimpleArray\":[\"spring\",null,\"fall\"]}},\"mapD\":{\"MapInArray\":[{\"name\":\"kid1\"},{\"name\":\"kid2\",\"score\":{\"$numberInt\":82}}]}}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedCFsMCF(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"mapA\":{\"fA\":\"valueA\",\"mArray\":[{\"$numberInt\":100},{\"$numberInt\":200}]},\"mapB\":{\"fB\":{\"$numberInt\":111},\"mapC\":{\"fC\":true,\"mapF\":{\"fF\":\"valueF\",\"arrayL\":[\"L0\",\"L1\",\"L2\"]}},\"arrayJ\":[\"J0\",\"J1\",\"J2\"]},\"mapD\":{\"fD\":{\"$numberInt\":222},\"mapX\":{\"fX\":false}},\"arrayG\":[\"G0\",\"G1\",\"G2\"],\"arrayH\":[\"H0\",\"H1\",\"H2\"]}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedCFsSCF(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"arrayG\":[\"G0\",\"G1\",\"G2\"],\"arrayH\":[\"H0\",\"H1\",\"H2\"],\"mapA\":{\"fA\":\"valueA\",\"mArray\":[{\"$numberInt\":100},{\"$numberInt\":200}]},\"mapB\":{\"arrayJ\":[\"J0\",\"J1\",\"J2\"],\"fB\":{\"$numberInt\":111},\"mapC\":{\"fC\":true,\"mapF\":{\"arrayL\":[\"L0\",\"L1\",\"L2\"],\"fF\":\"valueF\"}}},\"mapD\":{\"fD\":{\"$numberInt\":222},\"mapX\":{\"fX\":false}}}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyBinaryRec(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        Assert.assertTrue((boolean)cdr.startsWith(recPrefix));
    }

    public static void verifySimpleMap(String strRowId, String incdr) {
        String cdr = incdr.trim();
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"mapA\":{\"fA\":\"valueA\"},\"mapB\":{\"fB\":{\"$numberInt\":100},\"mapC\":{\"fC\":true}},\"mapD\":{\"fD\":\"valueD\"}}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyReplaceRowMap(String strRowId, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"mapA\":{\"fA\":\"replacedValueA\"},\"mapB\":{\"fB\":{\"$numberInt\":200},\"mapC\":{\"fC\":false}},\"mapD\":{\"fD\":\"replacedValueD\"}}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyRowDelete(String strRowId, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_DELETE);
        Assert.assertTrue((boolean)cdr.startsWith(recPrefix));
    }

    public static void verifyInsertRowMap(String strRowId, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_INSERT);
        String recSuffix = ",\"$$document\":{\"mapA\":{\"fA\":\"insertedValueA\"},\"mapB\":{\"fB\":{\"$numberInt\":300},\"mapC\":{\"fC\":true}},\"mapD\":{\"fD\":\"insertedValueD\"}}}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyReplaceCFMap(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutReplaceCFMap(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyInsertCFMap(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutInsertCFMap(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyFieldDelete(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutFieldDelete(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyReplaceSimpleArray(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutReplaceSimpleArray(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyInsertSimpleArray(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutInsertSimpleArray(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyReplaceSimpleField(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutReplaceSimpleField(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyInsertSimpleField(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = TestCDPSOpenFormatJSONWithCluster.getOneMutInsertSimpleField(fieldPath);
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL1(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"teacher1\",[\"G1\",\"G2\"],[{\"$numberInt\":91},{\"$numberInt\":92},{\"$numberInt\":93}]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL2(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"teacher1\",[\"G1\",\"K\"],[{\"$numberInt\":91},{\"$numberInt\":92},{\"$numberInt\":93}]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL3(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"teacher1\",[\"G1\",\"K\",\"G3\"],[{\"$numberInt\":91},{\"$numberInt\":92},{\"$numberInt\":93}]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL4(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[[\"G1\",\"K\",\"G3\"],[{\"$numberInt\":91},{\"$numberInt\":92},{\"$numberInt\":93}]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL5(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[[\"G1\",\"K\",\"G3\"],[{\"$numberInt\":91},{\"$numberInt\":93}]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldL6(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"ArrayInArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[[\"G1\",\"K\",\"G3\"],[\"replacedItem1\",\"replacedItem2\",\"replacedItem3\"]]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM1(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"spring\",\"fall\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM2(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"spring\",\"summer\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM3(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"summer\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM4(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"summer\",\"winter\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM5(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"summer\",\"winter\",\"nomoreseason\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldM6(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.mapC.SimpleArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"spring\",\"winter\",\"nomoreseason\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldN1(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapE.NewArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"circle\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldN2(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapE.NewArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"circle\",\"square\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldN3(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapE.NewArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"circle\",\"square\",\"rectangle\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyArrayInArrayFieldN4(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapE.NewArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"square\",\"rectangle\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO1(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapA.fA\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapA.fA2\",\"$fieldOp\":\"$SET\",\"$fieldValue\":\"setMixMapAfA2\"},{\"$fieldPath\":\"mapB.fB\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.fB21\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":21}},{\"$fieldPath\":\"mapB.mapC.fBC22\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":22}},{\"$fieldPath\":\"mapB.mapC.fC\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.fBCF23\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":23}},{\"$fieldPath\":\"mapB.mapC.mapF.fF\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapD\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapE\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fE6\":\"setMixMapEfB6\",\"fE7\":\"setMixMapEfB7\"}}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO2(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapB.fB\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.fB21\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":21}},{\"$fieldPath\":\"mapB.mapC.fBC22\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":22}},{\"$fieldPath\":\"mapB.mapC.fC\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.fBCF23\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":23}},{\"$fieldPath\":\"mapB.mapC.mapF.fF\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapD\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapE\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fE6\":\"setMixMapEfB6\",\"fE7\":\"setMixMapEfB7\"}}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO3mcf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapA.mArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[{\"$numberInt\":210}]},{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO3scf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]},{\"$fieldPath\":\"mapA.mArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[{\"$numberInt\":210}]},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO4mcf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO4scf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO5mcf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"mapA.fA\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapA.fA2\",\"$fieldOp\":\"$SET\",\"$fieldValue\":\"setMixMapAfA2\"},{\"$fieldPath\":\"mapA.mArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[{\"$numberInt\":210}]},{\"$fieldPath\":\"mapB.fB\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.fB21\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":21}},{\"$fieldPath\":\"mapB.mapC.fBC22\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":22}},{\"$fieldPath\":\"mapB.mapC.fC\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.fBCF23\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":23}},{\"$fieldPath\":\"mapB.mapC.mapF.fF\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapD\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapE\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fE6\":\"setMixMapEfB6\",\"fE7\":\"setMixMapEfB7\"}},{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void verifyMixedFieldO5scf(String strRowId, String fieldPath, String cdr) {
        String recPrefix = TestCDPSOpenFormatJSONWithCluster.getRecHead(strRowId, ChangeDataRecordType.RECORD_UPDATE);
        String recSuffix = ",\"$mutations\":[{\"$fieldPath\":\"arrayG\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NG0\",\"G2\"]},{\"$fieldPath\":\"arrayH\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"arrayI\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NI0\",\"NI1\",\"NI3\"]},{\"$fieldPath\":\"mapA.fA\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapA.fA2\",\"$fieldOp\":\"$SET\",\"$fieldValue\":\"setMixMapAfA2\"},{\"$fieldPath\":\"mapA.mArray\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[{\"$numberInt\":210}]},{\"$fieldPath\":\"mapB.arrayJ\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"J0\",\"J1\",\"NJ3\"]},{\"$fieldPath\":\"mapB.fB\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.fB21\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":21}},{\"$fieldPath\":\"mapB.mapC.arrayK\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.fBC22\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":22}},{\"$fieldPath\":\"mapB.mapC.fC\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapB.mapC.mapF.arrayL\",\"$fieldOp\":\"$SET\",\"$fieldValue\":[\"NL0\",\"NL1\",\"NL3\"]},{\"$fieldPath\":\"mapB.mapC.mapF.fBCF23\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"$numberInt\":23}},{\"$fieldPath\":\"mapB.mapC.mapF.fF\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapD\",\"$fieldOp\":\"$DELETE\"},{\"$fieldPath\":\"mapE\",\"$fieldOp\":\"$SET\",\"$fieldValue\":{\"fE6\":\"setMixMapEfB6\",\"fE7\":\"setMixMapEfB7\"}}]}";
        TestCDPSOpenFormatJSONWithCluster.verifyData(recPrefix, recSuffix, cdr);
    }

    public static void processOneNode(String strRowId, VerifyMethod vfMethod, ConsumerRecord<byte[], byte[]> crec, int idxInBucket, boolean isDebug, boolean isMcf, String putfield) throws IOException {
        byte[] cdr = (byte[])crec.value();
        String rec = new String(cdr);
        if (isDebug) {
            _logger.info("bucket:" + idxInBucket + (isMcf ? "-mcf:" : "-scf:") + rec);
        }
        switch (vfMethod) {
            case VerifySimpleMap: {
                TestCDPSOpenFormatJSONWithCluster.verifySimpleMap(strRowId, rec);
                break;
            }
            case VerifyReplaceRowMap: {
                TestCDPSOpenFormatJSONWithCluster.verifyReplaceRowMap(strRowId, rec);
                break;
            }
            case VerifyInsertRowMap: {
                TestCDPSOpenFormatJSONWithCluster.verifyInsertRowMap(strRowId, rec);
                break;
            }
            case VerifyReplaceCFMap: {
                TestCDPSOpenFormatJSONWithCluster.verifyReplaceCFMap(strRowId, putfield, rec);
                break;
            }
            case VerifyInsertCFMap: {
                TestCDPSOpenFormatJSONWithCluster.verifyInsertCFMap(strRowId, putfield, rec);
                break;
            }
            case VerifyRowDelete: {
                TestCDPSOpenFormatJSONWithCluster.verifyRowDelete(strRowId, rec);
                break;
            }
            case VerifyFieldDelete: {
                TestCDPSOpenFormatJSONWithCluster.verifyFieldDelete(strRowId, putfield, rec);
                break;
            }
            case VerifyReplaceSimpleArray: {
                TestCDPSOpenFormatJSONWithCluster.verifyReplaceSimpleArray(strRowId, putfield, rec);
                break;
            }
            case VerifyInsertSimpleArray: {
                TestCDPSOpenFormatJSONWithCluster.verifyInsertSimpleArray(strRowId, putfield, rec);
                break;
            }
            case VerifyReplaceSimpleField: {
                TestCDPSOpenFormatJSONWithCluster.verifyReplaceSimpleField(strRowId, putfield, rec);
                break;
            }
            case VerifyInsertSimpleField: {
                TestCDPSOpenFormatJSONWithCluster.verifyInsertSimpleField(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL1: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL1(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL2: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL2(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL3: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL3(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL4: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL4(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL5: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL5(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldL6: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldL6(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM1: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM1(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM2: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM2(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM3: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM3(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM4: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM4(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM5: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM5(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldM6: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldM6(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldN1: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldN1(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldN2: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldN2(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldN3: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldN3(strRowId, putfield, rec);
                break;
            }
            case VerifyArrayInArrayFieldN4: {
                TestCDPSOpenFormatJSONWithCluster.verifyArrayInArrayFieldN4(strRowId, putfield, rec);
                break;
            }
            case VerifyMixedFieldO1: {
                TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO1(strRowId, putfield, rec);
                break;
            }
            case VerifyMixedFieldO2: {
                TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO2(strRowId, putfield, rec);
                break;
            }
            case VerifyMixedFieldO3: {
                if (isMcf) {
                    TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO3mcf(strRowId, putfield, rec);
                    break;
                }
                TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO3scf(strRowId, putfield, rec);
                break;
            }
            case VerifyMixedFieldO4: {
                if (isMcf) {
                    TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO4mcf(strRowId, putfield, rec);
                    break;
                }
                TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO4scf(strRowId, putfield, rec);
                break;
            }
            case VerifyMixedFieldO5: {
                if (isMcf) {
                    TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO5mcf(strRowId, putfield, rec);
                    break;
                }
                TestCDPSOpenFormatJSONWithCluster.verifyMixedFieldO5scf(strRowId, putfield, rec);
            }
        }
    }

    private void verifyInitCopy(List<ConsumerRecord<byte[], byte[]>> getList, boolean isMcf, boolean isDebug) throws Exception {
        int getCount = 0;
        String strRowId = ROWID_PREFIX + getCount;
        String consumerTypeStr = isMcf ? "-mcf-" : "-scf-";
        ConsumerRecord<byte[], byte[]> crec = null;
        byte[] cdr = null;
        crec = getList.get(0);
        cdr = (byte[])crec.value();
        String rec = new String(cdr);
        if (isDebug) {
            _logger.info(consumerTypeStr + strRowId + "-" + rec);
        }
        TestCDPSOpenFormatJSONWithCluster.verifyDataWithAllTypes(strRowId, rec);
        ++getCount;
        while (getCount < initMapNum) {
            strRowId = ROWID_PREFIX + getCount;
            crec = getList.get(getCount);
            cdr = (byte[])crec.value();
            rec = new String(cdr);
            if (isDebug) {
                _logger.info(consumerTypeStr + strRowId + "-" + rec);
            }
            TestCDPSOpenFormatJSONWithCluster.verifySimpleMap(strRowId, rec);
            ++getCount;
        }
        while (getCount < initMCFNum) {
            strRowId = ROWID_PREFIX + getCount;
            crec = getList.get(getCount);
            cdr = (byte[])crec.value();
            rec = new String(cdr);
            if (isDebug) {
                _logger.info(consumerTypeStr + strRowId + "-" + rec);
            }
            TestCDPSOpenFormatJSONWithCluster.verifyArrayWithAllTypes(strRowId, rec);
            ++getCount;
        }
        while (getCount < initRowNum - 1) {
            strRowId = "rowM" + getCount;
            crec = getList.get(getCount);
            cdr = (byte[])crec.value();
            rec = new String(cdr);
            if (isDebug) {
                _logger.info(consumerTypeStr + strRowId + "-" + rec);
            }
            if (isMcf) {
                TestCDPSOpenFormatJSONWithCluster.verifyMixedCFsMCF(strRowId, rec);
            } else {
                TestCDPSOpenFormatJSONWithCluster.verifyMixedCFsSCF(strRowId, rec);
            }
            ++getCount;
        }
        crec = getList.get(getCount);
        cdr = (byte[])crec.value();
        rec = new String(cdr);
        if (isDebug) {
            _logger.info(consumerTypeStr + strRowId + "-" + rec);
        }
        Assert.assertTrue((++getCount == initRowNum ? 1 : 0) != 0);
    }

    private void verifyBucketRepl(List<ConsumerRecord<byte[], byte[]>> getList, boolean isMcf, boolean isDebug) throws Exception {
        int rowOpId = bucketPutStartRow;
        int idxInBucket = 0;
        String strRowId = ROWID_PREFIX + rowOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceRowMap, getList.get(idxInBucket), idxInBucket, isDebug, isMcf, null);
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyRowDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertRowMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        _logger.info("passed verify bucket row mutation");
        int cfOpId = rowOpId + 1;
        strRowId = ROWID_PREFIX + cfOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapD");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE");
        _logger.info("passed verify bucket CF mutation");
        int mapOpId = cfOpId + 1;
        strRowId = ROWID_PREFIX + mapOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.mapAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.mapAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.mapAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapD.mapDX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapD.mapDX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapD.mapDX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mapEX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mapEX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertCFMap, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mapEX");
        _logger.info("passed verify bucket map mutation");
        int aryOpId = mapOpId + 1;
        strRowId = ROWID_PREFIX + aryOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapA.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.arrayAX");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleArray, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.arrayAX");
        _logger.info("passed verify bucket array mutation");
        int mfOpId = aryOpId + 1;
        strRowId = ROWID_PREFIX + mfOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyReplaceSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyFieldDelete, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mfield");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyInsertSimpleField, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.mfield");
        _logger.info("passed verify map field mutation");
        int afOpId = initMapNum + 1;
        strRowId = ROWID_PREFIX + afOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL1, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL2, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL3, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL4, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL5, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL5, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldL6, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "ArrayInArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM1, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM2, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM3, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM4, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM5, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldM6, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapB.mapC.SimpleArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldN1, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.NewArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldN2, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.NewArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldN3, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.NewArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldN4, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.NewArray");
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyArrayInArrayFieldN4, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, "mapE.NewArray");
        int mixOpId = initMCFNum + 1;
        strRowId = "rowM" + mixOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyMixedFieldO1, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        strRowId = "rowM" + ++mixOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyMixedFieldO2, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        strRowId = "rowM" + ++mixOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyMixedFieldO3, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        strRowId = "rowM" + ++mixOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyMixedFieldO4, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        strRowId = "rowM" + ++mixOpId;
        TestCDPSOpenFormatJSONWithCluster.processOneNode(strRowId, VerifyMethod.VerifyMixedFieldO5, getList.get(++idxInBucket), idxInBucket, isDebug, isMcf, null);
        ++idxInBucket;
        ++mixOpId;
    }

    @Test
    public void testJsonConsumer() throws Exception {
        String jsonMcfSrcTableName = null;
        String jsonScfSrcTableName = null;
        String cdpsDstTableName = null;
        String cdpsMcfDstTopicName = null;
        String cdpsScfDstTopicName = null;
        Table jsonMcfSrcTable = null;
        Table jsonScfSrcTable = null;
        ArrayList<Document> presetValuesMcf = null;
        ArrayList<Document> presetValuesScf = null;
        ArrayList<String> presetFieldNamesScf = null;
        ArrayList<String> presetFieldNamesMcf = null;
        int putCount = 0;
        Document arrayWithAllTypeRec = null;
        Document binaryIdRec = null;
        jsonMcfSrcTableName = SRC_MCF_TABLE_NAME;
        jsonScfSrcTableName = SRC_SCF_TABLE_NAME;
        cdpsDstTableName = DST_TABLE_NAME;
        cdpsMcfDstTopicName = cdpsDstTableName + ":jtOpenFormatJsonMcf";
        cdpsScfDstTopicName = cdpsDstTableName + ":jtOpenFormatJsonScf";
        _logger.info("srcMcf:" + jsonMcfSrcTableName + "srcScf:" + jsonScfSrcTableName + " dst:" + cdpsDstTableName + " dstMcfTopic:" + cdpsMcfDstTopicName + " dstScfTopic:" + cdpsScfDstTopicName);
        presetValuesMcf = new ArrayList<Document>();
        presetValuesScf = new ArrayList<Document>();
        presetFieldNamesMcf = new ArrayList<String>();
        presetFieldNamesScf = new ArrayList<String>();
        Document putrecMcf = null;
        Document putrecScf = null;
        try {
            putCount = 0;
            jsonMcfSrcTable = TestCDPSUtil.replaceMcfJsonTable2(testAdmin, jsonMcfSrcTableName);
            jsonScfSrcTable = TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonScfSrcTableName);
            String strRowId = ROWID_PREFIX + putCount;
            putrecMcf = TestCDPSOpenFormatJSONWithCluster.putDataWithAllTypes(strRowId, jsonMcfSrcTable);
            putrecScf = TestCDPSOpenFormatJSONWithCluster.putDataWithAllTypes(strRowId, jsonScfSrcTable);
            presetValuesMcf.add(putrecMcf);
            presetValuesScf.add(putrecScf);
            presetFieldNamesMcf.add("");
            presetFieldNamesScf.add("");
            ++putCount;
            initMapNum = 7;
            while (putCount < initMapNum) {
                strRowId = ROWID_PREFIX + putCount;
                _logger.info("---put simpleMap to " + strRowId + " to table " + jsonMcfSrcTableName);
                putrecMcf = TestCDPSJSONWithCluster.putSimpleMap(strRowId, jsonMcfSrcTable);
                _logger.info("---put simpleMap to " + strRowId + " to table " + jsonScfSrcTableName);
                putrecScf = TestCDPSJSONWithCluster.putSimpleMap(strRowId, jsonScfSrcTable);
                presetValuesMcf.add(putrecMcf);
                presetValuesScf.add(putrecScf);
                presetFieldNamesMcf.add("");
                presetFieldNamesScf.add("");
                ++putCount;
            }
            initMCFNum = 10;
            while (putCount < initMCFNum) {
                strRowId = ROWID_PREFIX + putCount;
                _logger.info("---put ArrayWithAllTypes to " + strRowId + " to table " + jsonMcfSrcTableName);
                putrecMcf = TestCDPSJSONWithCluster.putArrayWithAllTypes(strRowId, jsonMcfSrcTable);
                _logger.info("---put ArrayWithAllTypes to " + strRowId + " to table " + jsonScfSrcTableName);
                putrecScf = TestCDPSJSONWithCluster.putArrayWithAllTypes(strRowId, jsonScfSrcTable);
                presetValuesMcf.add(putrecMcf);
                presetValuesScf.add(putrecScf);
                presetFieldNamesMcf.add("");
                presetFieldNamesScf.add("");
                ++putCount;
            }
            arrayWithAllTypeRec = putrecScf;
            initRowNum = 20;
            while (putCount < initRowNum) {
                strRowId = "rowM" + putCount;
                _logger.info("---put mixed record to " + strRowId + " to table " + jsonMcfSrcTableName);
                putrecMcf = TestCDPSJSONWithCluster.putMixMapArray(strRowId, jsonMcfSrcTable);
                _logger.info("---put mixed record to " + strRowId + " to table " + jsonScfSrcTableName);
                putrecScf = TestCDPSJSONWithCluster.putMixMapArray(strRowId, jsonScfSrcTable);
                presetValuesMcf.add(putrecMcf);
                presetValuesScf.add(putrecScf);
                presetFieldNamesMcf.add("");
                presetFieldNamesScf.add("");
                ++putCount;
            }
            ++initRowNum;
            byte[] rowid = new byte[]{0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120};
            binaryIdRec = TestCDPSOpenFormatJSONWithCluster.putBinaryRow(rowid, jsonMcfSrcTable);
            binaryIdRec = TestCDPSOpenFormatJSONWithCluster.putBinaryRow(rowid, jsonScfSrcTable);
            presetValuesMcf.add(binaryIdRec);
            presetValuesScf.add(binaryIdRec);
            presetFieldNamesMcf.add("");
            presetFieldNamesScf.add("");
            ++putCount;
            TestCDPSUtil.replaceStreamTable(cdpsDstTableName);
            TestCDPSUtil.setupCDPSReplica(jsonMcfSrcTableName, cdpsDstTableName, cdpsMcfDstTopicName);
            TestCDPSUtil.setupCDPSReplica(jsonScfSrcTableName, cdpsDstTableName, cdpsScfDstTopicName);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        KafkaConsumer<byte[], byte[]> consumerMcf = null;
        KafkaConsumer<byte[], byte[]> consumerScf = null;
        KafkaConsumer<byte[], byte[]> consumerMcfGrp = null;
        KafkaConsumer<byte[], byte[]> consumerScfGrp = null;
        List<ConsumerRecord<byte[], byte[]>> getListMcf = null;
        List<ConsumerRecord<byte[], byte[]>> getListScf = null;
        List<ConsumerRecord<byte[], byte[]>> getListMcfGrp = null;
        List<ConsumerRecord<byte[], byte[]>> getListScfGrp = null;
        Document mutrecMcf = null;
        Document mutrecScf = null;
        boolean isDebug = true;
        consumerMcf = TestCDPSUtil.startByteArrayConsumer(cdpsMcfDstTopicName, false, null);
        consumerScf = TestCDPSUtil.startByteArrayConsumer(cdpsScfDstTopicName, false, null);
        consumerMcfGrp = TestCDPSUtil.startByteArrayConsumer(cdpsMcfDstTopicName, true, "cdcOfJsonMcf1");
        consumerScfGrp = TestCDPSUtil.startByteArrayConsumer(cdpsScfDstTopicName, true, "cdcOfJsonScf1");
        getListMcf = TestCDPSUtil.fetchByteArrayData(initRowNum, consumerMcf);
        getListScf = TestCDPSUtil.fetchByteArrayData(initRowNum, consumerScf);
        getListMcfGrp = TestCDPSUtil.fetchByteArrayData(initRowNum, consumerMcfGrp);
        getListScfGrp = TestCDPSUtil.fetchByteArrayData(initRowNum, consumerScfGrp);
        this.verifyInitCopy(getListMcf, true, isDebug);
        this.verifyInitCopy(getListScf, false, isDebug);
        this.verifyInitCopy(getListMcfGrp, true, isDebug);
        this.verifyInitCopy(getListScfGrp, false, isDebug);
        _logger.info("passed verify mutations from copyRegion, getCount " + initRowNum);
        _logger.info("putCount " + putCount + ". Now generating mutations for bucket replication:");
        _logger.info("bucket row put start at " + putCount);
        int rowOpId = bucketPutStartRow;
        String strRowId = ROWID_PREFIX + rowOpId;
        mutrecMcf = TestCDPSJSONWithCluster.replaceRow(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceRow(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        TestCDPSJSONWithCluster.deleteRow(strRowId, jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteRow(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertRow(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertRow(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        _logger.info("bucket CF put start at " + ++putCount);
        int cfOpId = rowOpId + 1;
        strRowId = ROWID_PREFIX + cfOpId;
        mutrecMcf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapD", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapD", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapD");
        presetFieldNamesScf.add("mapD");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mapC", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mapC", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapB.mapC");
        presetFieldNamesScf.add("mapB.mapC");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapB.mapC", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapB.mapC", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC");
        presetFieldNamesScf.add("mapB.mapC");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapE", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapE", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE");
        presetFieldNamesScf.add("mapE");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapE");
        presetFieldNamesScf.add("mapE");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapE", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapE", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE");
        presetFieldNamesScf.add("mapE");
        _logger.info("bucket map put start at " + ++putCount);
        int mapOpId = cfOpId + 1;
        strRowId = ROWID_PREFIX + mapOpId;
        mutrecMcf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapA.mapAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapA.mapAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapA.mapAX");
        presetFieldNamesScf.add("mapA.mapAX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapA.mapAX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapA.mapAX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapA.mapAX");
        presetFieldNamesScf.add("mapA.mapAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapA.mapAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapA.mapAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapA.mapAX");
        presetFieldNamesScf.add("mapA.mapAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapD.mapDX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapD.mapDX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapD.mapDX");
        presetFieldNamesScf.add("mapD.mapDX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapD.mapDX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapD.mapDX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapD.mapDX");
        presetFieldNamesScf.add("mapD.mapDX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapD.mapDX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapD.mapDX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapD.mapDX");
        presetFieldNamesScf.add("mapD.mapDX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapE.mapEX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceMap(strRowId, "mapE.mapEX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.mapEX");
        presetFieldNamesScf.add("mapE.mapEX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.mapEX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.mapEX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapE.mapEX");
        presetFieldNamesScf.add("mapE.mapEX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapE.mapEX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertMap(strRowId, "mapE.mapEX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.mapEX");
        presetFieldNamesScf.add("mapE.mapEX");
        _logger.info("bucket array put start at " + ++putCount);
        int aryOpId = mapOpId + 1;
        strRowId = ROWID_PREFIX + aryOpId;
        mutrecMcf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapA.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapA.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapA.arrayAX");
        presetFieldNamesScf.add("mapA.arrayAX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapA.arrayAX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapA.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapA.arrayAX");
        presetFieldNamesScf.add("mapA.arrayAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapA.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapA.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapA.arrayAX");
        presetFieldNamesScf.add("mapA.arrayAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapB.mapC.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapB.mapC.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.arrayAX");
        presetFieldNamesScf.add("mapB.mapC.arrayAX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mapC.arrayAX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mapC.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapB.mapC.arrayAX");
        presetFieldNamesScf.add("mapB.mapC.arrayAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapB.mapC.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapB.mapC.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.arrayAX");
        presetFieldNamesScf.add("mapB.mapC.arrayAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapE.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceArray(strRowId, "mapE.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.arrayAX");
        presetFieldNamesScf.add("mapE.arrayAX");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.arrayAX", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapE.arrayAX");
        presetFieldNamesScf.add("mapE.arrayAX");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapE.arrayAX", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertArray(strRowId, "mapE.arrayAX", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.arrayAX");
        presetFieldNamesScf.add("mapE.arrayAX");
        _logger.info("bucket map field put start at " + ++putCount);
        int mfOpId = aryOpId + 1;
        strRowId = ROWID_PREFIX + mfOpId;
        mutrecMcf = TestCDPSJSONWithCluster.replaceField(strRowId, "mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceField(strRowId, "mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mfield");
        presetFieldNamesScf.add("mfield");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mfield", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mfield", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mfield");
        presetFieldNamesScf.add("mfield");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertField(strRowId, "mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertField(strRowId, "mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mfield");
        presetFieldNamesScf.add("mfield");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceField(strRowId, "mapB.mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceField(strRowId, "mapB.mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mfield");
        presetFieldNamesScf.add("mapB.mfield");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mfield", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapB.mfield", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapB.mfield");
        presetFieldNamesScf.add("mapB.mfield");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertField(strRowId, "mapB.mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertField(strRowId, "mapB.mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mfield");
        presetFieldNamesScf.add("mapB.mfield");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.replaceField(strRowId, "mapE.mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.replaceField(strRowId, "mapE.mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.mfield");
        presetFieldNamesScf.add("mapE.mfield");
        ++putCount;
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.mfield", jsonMcfSrcTable);
        TestCDPSJSONWithCluster.deleteField(strRowId, "mapE.mfield", jsonScfSrcTable);
        presetValuesMcf.add(null);
        presetValuesScf.add(null);
        presetFieldNamesMcf.add("mapE.mfield");
        presetFieldNamesScf.add("mapE.mfield");
        ++putCount;
        mutrecMcf = TestCDPSJSONWithCluster.insertField(strRowId, "mapE.mfield", jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.insertField(strRowId, "mapE.mfield", jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.mfield");
        presetFieldNamesScf.add("mapE.mfield");
        _logger.info("bucket array field put start at " + ++putCount);
        int afOpId = initMapNum + 1;
        strRowId = ROWID_PREFIX + afOpId;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[0]", "teacher1", jsonMcfSrcTable, arrayWithAllTypeRec);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[0]", "teacher1", jsonScfSrcTable, arrayWithAllTypeRec);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[0]");
        presetFieldNamesScf.add("ArrayInArray[0]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[1][1]", "K", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[1][1]", "K", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[1][1]");
        presetFieldNamesScf.add("ArrayInArray[1][1]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[1][3]", "G3", jsonMcfSrcTable, mutrecMcf, "ArrayInArray[1][2]");
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "ArrayInArray[1][3]", "G3", jsonScfSrcTable, mutrecScf, "ArrayInArray[1][2]");
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[1][3]");
        presetFieldNamesScf.add("ArrayInArray[1][3]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[0]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[0]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[0]");
        presetFieldNamesScf.add("ArrayInArray[0]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[1][1]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[1][1]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[1][1]");
        presetFieldNamesScf.add("ArrayInArray[1][1]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, true);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[1][5]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "ArrayInArray[1][5]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[1][5]");
        presetFieldNamesScf.add("ArrayInArray[1][5]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, true);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayInArray(strRowId, "ArrayInArray[1]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayInArray(strRowId, "ArrayInArray[1]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("ArrayInArray[1]");
        presetFieldNamesScf.add("ArrayInArray[1]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapB.mapC.SimpleArray[1]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapB.mapC.SimpleArray[1]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[1]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[1]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[1]", "summer", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[1]", "summer", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[1]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[1]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapB.mapC.SimpleArray[0]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapB.mapC.SimpleArray[0]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[0]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[0]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[2]", "winter", jsonMcfSrcTable, mutrecMcf, "mapB.mapC.SimpleArray[1]");
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[2]", "winter", jsonScfSrcTable, mutrecScf, "mapB.mapC.SimpleArray[1]");
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[2]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[2]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[4]", "nomoreseason", jsonMcfSrcTable, mutrecMcf, "mapB.mapC.SimpleArray[2]");
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[4]", "nomoreseason", jsonScfSrcTable, mutrecScf, "mapB.mapC.SimpleArray[2]");
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[4]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[4]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[0]", "spring", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapB.mapC.SimpleArray[0]", "spring", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapB.mapC.SimpleArray[0]");
        presetFieldNamesScf.add("mapB.mapC.SimpleArray[0]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[3]", "circle", jsonMcfSrcTable, mutrecMcf, "mapE.NewArray[0]");
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[3]", "circle", jsonScfSrcTable, mutrecScf, "mapE.NewArray[0]");
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.NewArray[3]");
        presetFieldNamesScf.add("mapE.NewArray[3]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[2]", "square", jsonMcfSrcTable, mutrecMcf, "mapE.NewArray[1]");
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[2]", "square", jsonScfSrcTable, mutrecScf, "mapE.NewArray[1]");
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.NewArray[2]");
        presetFieldNamesScf.add("mapE.NewArray[2]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[2]", "rectangle", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.replaceArrayItem(strRowId, "mapE.NewArray[2]", "rectangle", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.NewArray[2]");
        presetFieldNamesScf.add("mapE.NewArray[2]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapE.NewArray[0]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapE.NewArray[0]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.NewArraymapB.mapC.SimpleArray[0]");
        presetFieldNamesScf.add("mapE.NewArraymapB.mapC.SimpleArray[0]");
        ++putCount;
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        mutrecMcf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapE.NewArray[2]", jsonMcfSrcTable, mutrecMcf);
        mutrecScf = TestCDPSJSONWithCluster.deleteArrayField(strRowId, "mapE.NewArray[2]", jsonScfSrcTable, mutrecScf);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("mapE.NewArray[2]");
        presetFieldNamesScf.add("mapE.NewArray[2]");
        TestCDPSUtil.fetchTableRecAndPause(strRowId, jsonMcfSrcTable, false);
        int mixOpId = initMCFNum + 1;
        strRowId = "rowM" + mixOpId;
        _logger.info("bucket mixed op put start at " + ++putCount);
        mutrecMcf = TestCDPSJSONWithCluster.mixedSetDels1(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.mixedSetDels1(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        strRowId = "rowM" + ++mixOpId;
        mutrecMcf = TestCDPSJSONWithCluster.mixedSetDels2(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.mixedSetDels2(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        strRowId = "rowM" + ++mixOpId;
        mutrecMcf = TestCDPSJSONWithCluster.mixedSetDels3(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.mixedSetDels3(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        strRowId = "rowM" + ++mixOpId;
        mutrecMcf = TestCDPSJSONWithCluster.mixedSetDels4(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.mixedSetDels4(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++putCount;
        strRowId = "rowM" + ++mixOpId;
        mutrecMcf = TestCDPSJSONWithCluster.mixedSetDels5(strRowId, jsonMcfSrcTable);
        mutrecScf = TestCDPSJSONWithCluster.mixedSetDels5(strRowId, jsonScfSrcTable);
        presetValuesMcf.add(mutrecMcf);
        presetValuesScf.add(mutrecScf);
        presetFieldNamesMcf.add("");
        presetFieldNamesScf.add("");
        ++mixOpId;
        _logger.info("Done bucket put, total putCount " + ++putCount);
        boolean isMcf = true;
        int bucketPutCount = putCount - initRowNum;
        getListMcf = TestCDPSUtil.fetchByteArrayDataWithBreak(bucketPutCount, consumerMcf, 240);
        _logger.info("Done bucket fetch Mcf, bucketPutCount " + bucketPutCount + " get count" + getListMcf.size());
        getListScf = TestCDPSUtil.fetchByteArrayDataWithBreak(bucketPutCount, consumerScf, 240);
        _logger.info("Done bucket fetch Scf, bucketPutCount " + bucketPutCount + " get count" + getListScf.size());
        getListMcfGrp = TestCDPSUtil.fetchByteArrayDataWithBreak(bucketPutCount, consumerMcfGrp, 240);
        _logger.info("Done bucket fetch McfGrp, bucketPutCount " + bucketPutCount + " get count" + getListMcfGrp.size());
        getListScfGrp = TestCDPSUtil.fetchByteArrayDataWithBreak(bucketPutCount, consumerScfGrp, 240);
        _logger.info("Done bucket fetch Scf, bucketPutCount " + bucketPutCount + " get count" + getListScfGrp.size());
        Assert.assertEquals((long)bucketPutCount, (long)getListMcf.size());
        Assert.assertEquals((long)bucketPutCount, (long)getListScf.size());
        Assert.assertEquals((long)bucketPutCount, (long)getListMcfGrp.size());
        Assert.assertEquals((long)bucketPutCount, (long)getListScfGrp.size());
        this.verifyBucketRepl(getListMcf, true, isDebug);
        this.verifyBucketRepl(getListScf, false, isDebug);
        this.verifyBucketRepl(getListMcfGrp, true, isDebug);
        this.verifyBucketRepl(getListScfGrp, false, isDebug);
        if (jsonMcfSrcTable != null) {
            // empty if block
        }
        if (jsonScfSrcTable != null) {
            // empty if block
        }
        if (MapRDBImpl.tableExists((String)cdpsDstTableName)) {
            // empty if block
        }
    }

    public static void concatArray(String rowid, String fieldName, DocumentStore jsonTable, List<Object> inArray) throws IOException {
        DocumentMutation mutation = MapRDBImpl.newMutation();
        mutation.append(fieldName, inArray);
        jsonTable.update(rowid, mutation);
    }

    public static void mergeMap(String rowid, String fieldName, DocumentStore jsonTable, Document inMap) throws IOException {
        DocumentMutation mutation = MapRDBImpl.newMutation();
        mutation.merge(fieldName, inMap);
        jsonTable.update(rowid, mutation);
    }

    @Test
    public void testAppMerge() throws Exception {
        int riCount = 1;
        int rbCount1 = 1000;
        int rbCount2 = 1000;
        String rowid = "row1";
        String jsrc1 = "/tmp/jAppMerge1";
        String chglogdst1 = "/tmp/chgAppMerge1";
        String topicFullName = chglogdst1 + ":jAppMerge1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("arrayA1[0]", "mapreduce");
        rec.set("arrayA1[1]", 2);
        rec.set("arrayA1[2]", false);
        rec.set("mapA1.fs", "HDFS");
        rec.set("mapA1.support", true);
        rec.set("mapA1.version", 3);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        KafkaConsumer<byte[], String> jConsumer = TestCDPSUtil.startStringConsumer(topicFullName, false, null);
        List<ConsumerRecord<byte[], String>> getList = TestCDPSUtil.fetchStringData(riCount, jConsumer);
        ConsumerRecord<byte[], String> crec = null;
        String cdr = null;
        for (int i = 0; i < riCount; ++i) {
            crec = getList.get(i);
            cdr = (String)crec.value();
            _logger.info("copyRegion " + i + ":" + rowid + "-" + cdr);
        }
        ArrayList<Object> inArray1 = null;
        for (int i = 0; i < rbCount1; ++i) {
            inArray1 = new ArrayList<Object>();
            inArray1.add(Float.valueOf(i));
            inArray1.add("spark" + i);
            TestCDPSOpenFormatJSONWithCluster.concatArray(rowid, "arrayA1", (DocumentStore)jsonTable, inArray1);
        }
        DBDocumentImpl recmut = new DBDocumentImpl();
        for (int i = 0; i < rbCount2; ++i) {
            recmut.set("mfs", "MapRFS");
            recmut.set("mfsVer" + i, i);
            TestCDPSOpenFormatJSONWithCluster.mergeMap(rowid, "mapA1", (DocumentStore)jsonTable, (Document)recmut);
        }
        int rbCount = rbCount1 + rbCount2;
        getList = TestCDPSUtil.fetchStringData(rbCount, jConsumer);
        for (int i = 0; i < rbCount; ++i) {
            crec = getList.get(i);
            cdr = (String)crec.value();
            if (i % 500 != 0) continue;
            _logger.info("bucketRepl " + i + ":" + rowid + "-" + cdr);
        }
        jsonTable.close();
    }

    @Test
    public void testArrayTS() throws Exception {
        long opTime2;
        long opTime1;
        TestStartTime = System.currentTimeMillis();
        boolean isDebug = true;
        int riCount = 1;
        String rowid = "row1";
        String jsrc1 = "/tmp/jAryTS1";
        String chglogdst1 = "/tmp/chgAryTS1";
        String topicFullName = chglogdst1 + ":jAryTS1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("arrayA1[0]", 1);
        rec.set("arrayA1[1]", 2);
        rec.set("arrayA1[2]", 3);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        KafkaConsumer<byte[], String> jConsumer = TestCDPSUtil.startStringConsumer(topicFullName, false, null);
        List<ConsumerRecord<byte[], String>> getList = TestCDPSUtil.fetchStringData(riCount, jConsumer);
        ConsumerRecord<byte[], String> crec = null;
        String cdr = null;
        crec = getList.get(0);
        cdr = (String)crec.value();
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info("copyRegion0: " + cdr);
        }
        if ((opTime1 = doc.getLong("$opTime")) <= TestStartTime) {
            _logger.error("wrong opTime first op Get:" + opTime1 + " <= test start:" + TestStartTime);
            Assert.assertTrue((opTime1 > TestStartTime ? 1 : 0) != 0);
        }
        ArrayList<Object> inArray1 = null;
        inArray1 = new ArrayList<Object>();
        inArray1.add(Float.valueOf(2.0f));
        inArray1.add("spark2");
        TestCDPSOpenFormatJSONWithCluster.concatArray(rowid, "arrayA1", (DocumentStore)jsonTable, inArray1);
        getList = TestCDPSUtil.fetchStringData(riCount, jConsumer);
        crec = getList.get(0);
        cdr = (String)crec.value();
        if (isDebug) {
            _logger.info("copyRegion0: " + cdr);
        }
        if ((opTime2 = doc.getLong("$opTime")) <= opTime1) {
            _logger.error("wrong opTime second op Get:" + opTime2 + " <= first op:" + opTime1);
            Assert.assertTrue((opTime2 > opTime1 ? 1 : 0) != 0);
        }
        jsonTable.close();
    }

    public static void verifyHead(String strRowId, String RowOpType, Document doc) throws IOException {
        TestCDPSOpenFormatJSONWithCluster.verifyHeadWithoutOpTime(strRowId, RowOpType, doc);
        long opTime = doc.getLong("$opTime");
        TestCDPSOpenFormatBinaryWithCluster.verifyOpTime(strRowId, opTime);
    }

    public static void verifyHeadWithoutOpTime(String strRowId, String RowOpType, Document doc) throws IOException {
        String opType;
        Value idVal = doc.getId();
        if (!idVal.getString().equals(strRowId)) {
            _logger.error("Wrong row id, expect " + strRowId + ", got " + idVal.getString());
            Assert.assertTrue((boolean)false);
        }
        if (!RowOpType.equals(opType = doc.getString("$opType"))) {
            _logger.error(strRowId + ": wrong opType, expect " + RowOpType + ", got " + opType);
            Assert.assertTrue((boolean)false);
        }
    }

    public static void verify(String strRowId, Document expectedDoc, String op, long testStartTS, String cdr, boolean isDebug) throws IOException {
        Document doc = MapRDB.newDocument((String)cdr);
        if (isDebug) {
            _logger.info(strRowId + ": " + doc.asJsonString() + "\n");
        }
        TestCDPSOpenFormatJSONWithCluster.verifyHeadWithoutOpTime(strRowId, op, doc);
        long opTime = doc.getLong("$opTime");
        if (opTime <= testStartTS) {
            _logger.error("wrong opTime Get:" + opTime + " <= GiveTs:" + testStartTS);
            Assert.assertTrue((boolean)false);
        }
        Map mutObjs = doc.getMap("$$document");
        for (Map.Entry entry : mutObjs.entrySet()) {
            KeyValue mutExpected;
            String fn = (String)entry.getKey();
            KeyValue mutFetched = (KeyValue)entry.getValue();
            if (mutFetched.equals((Object)(mutExpected = (KeyValue)expectedDoc.getValue(fn)))) continue;
            _logger.error("field " + fn + " mismatch! Get:" + mutFetched.asJsonString() + " Expect:" + mutExpected);
            Assert.assertTrue((boolean)false);
        }
    }

    @Test
    public void testChangeCFMapping() throws Exception {
        int i;
        int i2;
        TestStartTime = System.currentTimeMillis();
        boolean isDebug = true;
        int rCount = 1;
        String jsrc1A = "/tmp/jChangeCF1A";
        String jsrc1B = "/tmp/jChangeCF1B";
        String chglogdst1 = "/tmp/chgChangeCF1";
        String topicFullName1A = chglogdst1 + ":jChangeCF1A";
        String topicFullName1B = chglogdst1 + ":jChangeCF1B";
        TestCDPSUtil.replaceMcfJsonTable(testAdmin, jsrc1A);
        TestCDPSUtil.replaceMcfJsonTable2(testAdmin, jsrc1B);
        JsonTable jsonTable1A = MapRDBImpl.getTable((String)jsrc1A);
        JsonTable jsonTable1B = MapRDBImpl.getTable((String)jsrc1B);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        DBDocumentImpl rec1A = new DBDocumentImpl();
        rec1A.set("mapA.csA", "db1A1");
        rec1A.set("mapB.csB", "algorithm1A1");
        rec1A.set("mapE.csE", "os1A1");
        rec1A.set("arrayG[0]", "java1A1");
        rec1A.set("arrayG[1]", "c++1A1");
        jsonTable1A.insertOrReplace("row1", (Document)rec1A);
        DBDocumentImpl rec1B = new DBDocumentImpl();
        rec1B.set("mapA.csA", "db1B1");
        rec1B.set("mapB.csB", "algorithm1B1");
        rec1B.set("mapE.csE", "os1B1");
        rec1B.set("arrayG[0]", "java1B1");
        rec1B.set("arrayG[1]", "c++1B1");
        jsonTable1B.insertOrReplace("row1", (Document)rec1B);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1A, chglogdst1, topicFullName1A, false, null);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1B, chglogdst1, topicFullName1B, false, null);
        KafkaConsumer<byte[], String> jConsumer = TestCDPSUtil.startStringConsumer(topicFullName1A, false, null);
        List<ConsumerRecord<byte[], String>> getList = TestCDPSUtil.fetchStringData(rCount, jConsumer);
        ConsumerRecord<byte[], String> crec = null;
        String cdr = null;
        for (int i3 = 0; i3 < rCount; ++i3) {
            crec = getList.get(i3);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1A " + i3 + ":" + cdr);
            TestCDPSOpenFormatJSONWithCluster.verify("row1", (Document)rec1A, "$RECORD_INSERT", TestStartTime, cdr, isDebug);
        }
        int err = TestCDPSCLIWithCluster.renameCF(jsrc1A, "CF2", "CF22");
        rec1A = new DBDocumentImpl();
        rec1A.set("mapA.csA", "db1A2");
        rec1A.set("mapB.csB", "algorithm1A2");
        jsonTable1A.insertOrReplace("row2", (Document)rec1A);
        getList = TestCDPSUtil.fetchStringData(rCount, jConsumer);
        for (i2 = 0; i2 < rCount; ++i2) {
            crec = getList.get(i2);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1A " + i2 + ":" + cdr);
            TestCDPSOpenFormatJSONWithCluster.verify("row2", (Document)rec1A, "$RECORD_INSERT", TestStartTime, cdr, isDebug);
        }
        TestCDPSUtil.unsubscribeStringConsumer(jConsumer);
        TestCDPSUtil.subscribeStringConsumer(topicFullName1B, jConsumer);
        getList = TestCDPSUtil.fetchStringData(rCount, jConsumer);
        for (i2 = 0; i2 < rCount; ++i2) {
            crec = getList.get(i2);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1B " + i2 + ":" + cdr);
            TestCDPSOpenFormatJSONWithCluster.verify("row1", (Document)rec1B, "$RECORD_INSERT", TestStartTime, cdr, isDebug);
        }
        err = TestCDPSCLIWithCluster.renameCF(jsrc1B, "CF7", "CF77");
        rec1B = new DBDocumentImpl();
        rec1B.set("mapB.mapC.csBC", "noSQLB2");
        rec1B.set("arrayG[0]", "pythonGG2");
        jsonTable1B.insertOrReplace("row2", (Document)rec1B);
        getList = TestCDPSUtil.fetchStringData(rCount, jConsumer);
        for (i2 = 0; i2 < rCount; ++i2) {
            crec = getList.get(i2);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1B " + i2 + ":" + cdr);
            TestCDPSOpenFormatJSONWithCluster.verify("row2", (Document)rec1B, "$RECORD_INSERT", TestStartTime, cdr, isDebug);
        }
        jConsumer.close();
        KafkaConsumer<byte[], String> jGConsumer = TestCDPSUtil.startStringConsumer(topicFullName1A, true, "subUnsubTopic");
        getList = TestCDPSUtil.fetchStringData(2 * rCount, jGConsumer);
        for (i = 0; i < 2 * rCount; ++i) {
            crec = getList.get(i);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1A " + i + ":" + cdr);
        }
        TestCDPSUtil.unsubscribeStringConsumer(jGConsumer);
        TestCDPSUtil.subscribeStringConsumer(topicFullName1B, jGConsumer);
        getList = TestCDPSUtil.fetchStringData(2 * rCount, jGConsumer);
        for (i = 0; i < 2 * rCount; ++i) {
            crec = getList.get(i);
            cdr = (String)crec.value();
            _logger.info("Non Group consumer copyRegion1B " + i + ":" + cdr);
        }
        jGConsumer.close();
        jsonTable1A.close();
        jsonTable1B.close();
    }

    public static void printRow(String msg, String strRowId, Table jsonTable) throws IOException {
        Document doc = jsonTable.findById(strRowId);
        _logger.info(msg + ", " + strRowId + ", " + doc);
    }

    public static void insertOrReplaceRows(String[] strRowIds, Document rec, Table[] jsonTables) throws IOException {
        int tableCount = jsonTables.length;
        for (String srId : strRowIds) {
            for (int j = 0; j < tableCount; ++j) {
                Table jTable = jsonTables[j];
                jTable.insertOrReplace(srId, rec);
                jTable.flush();
            }
        }
    }

    public static void deleteRows(String[] strRowIds, Table[] jsonTables) throws IOException {
        int tableCount = jsonTables.length;
        for (String srId : strRowIds) {
            for (int j = 0; j < tableCount; ++j) {
                Table jTable = jsonTables[j];
                jTable.delete(srId);
                jTable.flush();
            }
        }
    }

    public static void mutateRow(String tableName, String strRowId, DocumentMutation mut, Table jsonTable) throws IOException {
        TestCDPSOpenFormatJSONWithCluster.printRow("Before upate " + tableName, strRowId, jsonTable);
        jsonTable.update(strRowId, mut);
        jsonTable.flush();
        TestCDPSOpenFormatJSONWithCluster.printRow("After upate " + tableName, strRowId, jsonTable);
    }

    public static void mutateRows(String[] tableNames, String[] strRowIds, DocumentMutation mut, Table[] jsonTables) throws IOException {
        int tableCount = jsonTables.length;
        for (String srId : strRowIds) {
            for (int j = 0; j < tableCount; ++j) {
                String tName = tableNames[j];
                Table jTable = jsonTables[j];
                TestCDPSOpenFormatJSONWithCluster.mutateRow(tName, srId, mut, jTable);
            }
        }
    }

    public static void compareRecords(Document doc1, Document doc2) throws IOException {
        String opType2;
        String opType1;
        Value idVal2;
        Value idVal1 = doc1.getId();
        if (!idVal1.equals(idVal2 = doc2.getId())) {
            _logger.error("mismatch Id: id1:" + idVal1 + ", id2:" + idVal2);
            Assert.assertTrue((boolean)false);
        }
        if (!(opType1 = doc1.getString("$opType")).equals(opType2 = doc2.getString("$opType"))) {
            _logger.error("mismatch opType: type1:" + opType1 + ", type2:" + opType2);
            Assert.assertTrue((boolean)false);
        }
        Value docVal1 = doc1.getValue("$$document");
        Value docVal2 = doc2.getValue("$$document");
        Assert.assertTrue((docVal1 == null && docVal2 == null || docVal1 != null && docVal2 != null ? 1 : 0) != 0);
        if (docVal1 != null && !TestCDPSUtil.valueEquals(docVal1, docVal2, true)) {
            _logger.error("mismatch docVal: docVal1:" + docVal1 + ", docVal2:" + docVal2);
            Assert.assertTrue((boolean)false);
        }
        Value mutVal1 = doc1.getValue("$mutations");
        Value mutVal2 = doc2.getValue("$mutations");
        Assert.assertTrue((mutVal1 == null && mutVal2 == null || mutVal1 != null && mutVal2 != null ? 1 : 0) != 0);
        if (mutVal1 != null) {
            List muts1 = mutVal1.getList();
            List muts2 = mutVal2.getList();
            Assert.assertEquals((long)muts1.size(), (long)muts2.size());
            for (int i = 0; i < muts1.size(); ++i) {
                Value mV1 = (Value)muts1.get(i);
                boolean found = false;
                for (int j = 0; j < muts2.size(); ++j) {
                    Value mV2 = (Value)muts2.get(j);
                    if (!TestCDPSUtil.valueEquals(mV1, mV2)) continue;
                    found = true;
                    break;
                }
                if (found) continue;
                _logger.error("mV1:" + mV1 + "not found in muts2:" + mutVal2);
                Assert.assertTrue((boolean)false);
            }
        }
    }

    @Test
    public void testNestedCF() throws Exception {
        TestStartTime = System.currentTimeMillis();
        boolean isDebug = true;
        int rCount = 32;
        String jscf1 = "/tmp/jScfNestCF1";
        String jmcf1 = "/tmp/jMcfNestCF1";
        String chglogdst1 = "/tmp/chgNestCF1";
        String scfTopicFullName1 = chglogdst1 + ":jScfNestCF1";
        String mcfTopicFullName1 = chglogdst1 + ":jMcfNestCF1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jscf1);
        TestCDPSUtil.replaceMcfJsonTableNested(testAdmin, jmcf1);
        JsonTable jscfTable1 = MapRDBImpl.getTable((String)jscf1);
        JsonTable jmcfTable1 = MapRDBImpl.getTable((String)jmcf1);
        String[] tableNames = new String[]{jscf1, jmcf1};
        Table[] jsonTables = new Table[]{jscfTable1, jmcfTable1};
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        String[] srId1As = new String[]{"row1A1", "row1A2", "row1A3", "row1A4"};
        String[] srId1Bs = new String[]{"row1B1", "row1B2", "row1B3", "row1B4"};
        String[] srId2As = new String[]{"row2A1", "row2A2", "row2A3", "row2A4"};
        String[] srId2Bs = new String[]{"row2B1", "row2B2", "row2B3", "row2B4"};
        String[] srId3As = new String[]{"row3A1", "row3A2", "row3A3", "row3A4"};
        String[] srId3Bs = new String[]{"row3B1", "row3B2", "row3B3", "row3B4"};
        String[] srId4As = new String[]{"row4A1", "row4A2", "row4A3", "row4A4"};
        String[] srId4Bs = new String[]{"row4B1", "row4B2", "row4B3", "row4B4"};
        String[] srId1s = new String[]{"row1A1", "row1B1", "row2A1", "row2B1", "row3A1", "row3B1", "row4A1", "row4B1"};
        String[] srId2s = new String[]{"row1A2", "row1B2", "row2A2", "row2B2", "row3A2", "row3B2", "row4A2", "row4B2"};
        String[] srId3s = new String[]{"row1A3", "row1B3", "row2A3", "row2B3", "row3A3", "row3B3", "row4A3", "row4B3"};
        String[] srId4s = new String[]{"row1A4", "row1B4", "row2A4", "row2B4", "row3A4", "row3B4", "row4A4", "row4B4"};
        DBDocumentImpl rec1A = new DBDocumentImpl();
        rec1A.set("mA.mB.mC.fC1", 11231);
        rec1A.set("mA.mB.mC.fC2", 11232);
        rec1A.set("mA.mB.aC[0]", 11236);
        rec1A.set("mA.mB.aC[1]", 11237);
        rec1A.set("defaultF", "1defaultFV1");
        rec1A.set("defaultA[0]", "1defaultAV1");
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId1As, (Document)rec1A, jsonTables);
        DBDocumentImpl rec1B = new DBDocumentImpl();
        rec1B.set("mA.mB.mC.fC1", 11231);
        rec1B.set("mA.mB.mC.fC2", 11232);
        rec1B.set("mA.mB.aC[0]", 11236);
        rec1B.set("mA.mB.aC[1]", 11237);
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId1Bs, (Document)rec1B, jsonTables);
        DBDocumentImpl rec2A = new DBDocumentImpl();
        rec2A.set("mA.mB.fB1", 2121);
        rec2A.set("mA.aB[0]", 2126);
        rec2A.set("mA.mB.mC.fC1", 21231);
        rec2A.set("mA.mB.mC.fC2", 21232);
        rec2A.set("mA.mB.aC[0]", 21236);
        rec2A.set("mA.mB.aC[1]", 21237);
        rec2A.set("defaultF", "2defaultFV1");
        rec2A.set("defaultA[0]", "2defaultAV1");
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId2As, (Document)rec2A, jsonTables);
        DBDocumentImpl rec2B = new DBDocumentImpl();
        rec2B.set("mA.mB.fB1", 2121);
        rec2B.set("mA.aB[0]", 2126);
        rec2B.set("mA.mB.mC.fC1", 21231);
        rec2B.set("mA.mB.mC.fC2", 21232);
        rec2B.set("mA.mB.aC[0]", 21236);
        rec2B.set("mA.mB.aC[1]", 21237);
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId2Bs, (Document)rec2B, jsonTables);
        DBDocumentImpl rec3A = new DBDocumentImpl();
        rec3A.set("mA.mB.mC.fC1", 31231);
        rec3A.set("mA.mB.mC.fC2", 31232);
        rec3A.set("mA.mB.aC[0]", 31236);
        rec3A.set("mA.mB.aC[1]", 31237);
        rec3A.set("mA.mB.mC.mD.fC1", 312341);
        rec3A.set("mA.mB.mC.aD[0]", 312346);
        rec3A.set("defaultF", "3defaultFV1");
        rec3A.set("defaultA[0]", "3defaultAV1");
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId3As, (Document)rec3A, jsonTables);
        DBDocumentImpl rec3B = new DBDocumentImpl();
        rec3B.set("mA.mB.mC.fC1", 31231);
        rec3B.set("mA.mB.mC.fC2", 31232);
        rec3B.set("mA.mB.aC[0]", 31236);
        rec3B.set("mA.mB.aC[1]", 31237);
        rec3B.set("mA.mB.mC.mD.fD1", 312341);
        rec3B.set("mA.mB.mC.aD[0]", 312346);
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId3Bs, (Document)rec3B, jsonTables);
        DBDocumentImpl rec4A = new DBDocumentImpl();
        rec4A.set("mA.mB.fB1", 4121);
        rec4A.set("mA.aB[0]", 4126);
        rec4A.set("mA.mB.mC.fC1", 41231);
        rec4A.set("mA.mB.mC.fC2", 41232);
        rec4A.set("mA.mB.aC[0]", 41236);
        rec4A.set("mA.mB.aC[1]", 41237);
        rec4A.set("mA.mB.mC.mD.fD1", 412341);
        rec4A.set("mA.mB.mC.aD[0]", 412346);
        rec4A.set("defaultF", "4defaultFV1");
        rec4A.set("defaultA[0]", "4defaultAV1");
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId4As, (Document)rec4A, jsonTables);
        DBDocumentImpl rec4B = new DBDocumentImpl();
        rec4B.set("mA.mB.fB1", 4121);
        rec4B.set("mA.aB[0]", 4126);
        rec4B.set("mA.mB.mC.fC1", 41231);
        rec4B.set("mA.mB.mC.fC2", 41232);
        rec4B.set("mA.mB.aC[0]", 41236);
        rec4B.set("mA.mB.aC[1]", 41237);
        rec4B.set("mA.mB.mC.mD.fD1", 412341);
        rec4B.set("mA.mB.mC.aD[0]", 412346);
        TestCDPSOpenFormatJSONWithCluster.insertOrReplaceRows(srId4Bs, (Document)rec4B, jsonTables);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jscf1, chglogdst1, scfTopicFullName1, false, null);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jmcf1, chglogdst1, mcfTopicFullName1, false, null);
        KafkaConsumer<byte[], String> jscfConsumer = TestCDPSUtil.startStringConsumer(scfTopicFullName1, false, null);
        KafkaConsumer<byte[], String> jmcfConsumer = TestCDPSUtil.startStringConsumer(mcfTopicFullName1, false, null);
        List<ConsumerRecord<byte[], String>> scfGetList = TestCDPSUtil.fetchStringData(rCount, jscfConsumer);
        List<ConsumerRecord<byte[], String>> mcfGetList = TestCDPSUtil.fetchStringData(rCount, jmcfConsumer);
        ConsumerRecord<byte[], String> crec = null;
        String cdrScf = null;
        String cdrMcf = null;
        Document docScf = null;
        Document docMcf = null;
        for (int i = 0; i < rCount; ++i) {
            crec = scfGetList.get(i);
            cdrScf = (String)crec.value();
            docScf = MapRDB.newDocument((String)cdrScf);
            crec = mcfGetList.get(i);
            cdrMcf = (String)crec.value();
            docMcf = MapRDB.newDocument((String)cdrMcf);
            TestCDPSOpenFormatJSONWithCluster.compareRecords(docScf, docMcf);
        }
        _logger.info("------row1A1, row1B1, row2A1, row2B1, row3A1, row3B1, row4A1, row4B1");
        _logger.info("------mA.mB.mC.fC1->91231, mA.mB.mC.fC2->null, mA.mB.mC.fC3->91233");
        _logger.info("      mA.mB.aC[1]->91237, mA.mB.aC[0]->null, mA.mB.aC[2]->91238");
        DocumentMutation mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.mC.fC1", 91231);
        mutMC.delete("mA.mB.mC.fC2");
        mutMC.set("mA.mB.mC.fC3", 91233);
        mutMC.setOrReplace("mA.mB.aC[1]", 91237);
        mutMC.delete("mA.mB.aC[0]");
        mutMC.set("mA.mB.aC[2]", 911238);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC->9111");
        _logger.info("      mA.mB.aC->9111");
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.mC", 9111);
        mutMC.setOrReplace("mA.mB.aC", 9111);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC->[9221,9222]");
        _logger.info("      mA.mB.aC->{fC1:9226,fC2:9227}");
        ArrayList<Integer> alist = new ArrayList<Integer>();
        alist.add(9221);
        alist.add(9222);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.mC", alist);
        mutMC.setOrReplace("mA.mB.aC.fC1", 9226);
        mutMC.setOrReplace("mA.mB.aC.fC2", 9227);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC->{fC1:9331,fC2:9332}");
        _logger.info("      mA.mB.aC->[9336,9337]");
        alist = new ArrayList();
        alist.add(9336);
        alist.add(9337);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.aC", alist);
        mutMC.setOrReplace("mA.mB.mC.fC1", 9331);
        mutMC.setOrReplace("mA.mB.mC.fC2", 9332);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC->null");
        _logger.info("      mA.mB.aC->null");
        mutMC = MapRDBImpl.newMutation();
        mutMC.delete("mA.mB.mC");
        mutMC.delete("mA.mB.aC");
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------row1A2, row1B2, row2A2, row2B2, row3A2, row3B2, row4A2, row4B2");
        _logger.info("------mA.mB.mC.mD.fD1->912341, mA.mB.mC.mD.fD2->null, mA.mB.mC.mD.fD3->912343");
        _logger.info("      mA.mB.mC.aD[1]->912347, mA.mB.mC.aD[0]->null, mA.mB.mC.aD[2]->912348");
        DocumentMutation mutMD = MapRDBImpl.newMutation();
        mutMD.setOrReplace("mA.mB.mC.mD.fD1", 912341);
        mutMD.delete("mA.mB.mC.mD.fD2");
        mutMD.set("mA.mB.mC.mD.fD3", 912343);
        mutMD.setOrReplace("mA.mB.mC.aD[1]", 912347);
        mutMD.delete("mA.mB.mC.aD[0]");
        mutMD.set("mA.mB.mC.aD[2]", 9112348);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId2s, mutMD, jsonTables);
        _logger.info("------mA.mB.mC.mD->91111");
        _logger.info("      mA.mB.mC.aD->91111");
        mutMD = MapRDBImpl.newMutation();
        mutMD.setOrReplace("mA.mB.mC.mD", 91111);
        mutMD.setOrReplace("mA.mB.mC.aD", 91111);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId2s, mutMD, jsonTables);
        _logger.info("------mA.mB.mC.mD->[92226,92227]");
        _logger.info("      mA.mB.mC.aD->{f1:92221,f2:92222}");
        alist = new ArrayList();
        alist.add(92226);
        alist.add(92227);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.mC.mD", alist);
        mutMC.setOrReplace("mA.mB.mC.aD.fD1", 92221);
        mutMC.setOrReplace("mA.mB.mC.aD.fD2", 92222);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC.mD->{fD1:93331,fD2:93332}");
        _logger.info("      mA.mB.mC.aD->[93336,93337]");
        alist = new ArrayList();
        alist.add(93336);
        alist.add(93337);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB.mC.aD", alist);
        mutMC.setOrReplace("mA.mB.mC.mD.fD1", 93331);
        mutMC.setOrReplace("mA.mB.mC.mD.fD2", 93332);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB.mC.mD->null");
        _logger.info("      mA.mB.mC.aD->null");
        mutMD = MapRDBImpl.newMutation();
        mutMD.delete("mA.mB.mC.mD");
        mutMD.delete("mA.mB.mC.aD");
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId2s, mutMD, jsonTables);
        _logger.info("------row1A3, row1B3, row2A3, row2B3, row3A3, row3B3, row4A3, row4B3");
        _logger.info("------mA.mB.fB1->9121, mA.mB.fB2->null, mA.mB.fB3->9123");
        _logger.info("      mA.aB[1]->9127, mA.aB[0]->null, mA.aB[2]->9128");
        DocumentMutation mutMB = MapRDBImpl.newMutation();
        mutMB.setOrReplace("mA.mB.fB1", 9121);
        mutMB.delete("mA.mB.fB2");
        mutMB.set("mA.mB.fB3", 9123);
        mutMB.setOrReplace("mA.aB[1]", 9127);
        mutMB.delete("mA.aB[0]");
        mutMB.set("mA.aB[2]", 91128);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId3s, mutMB, jsonTables);
        _logger.info("------mA.mB->911");
        _logger.info("      mA.aB->911");
        mutMB = MapRDBImpl.newMutation();
        mutMB.setOrReplace("mA.mB", 911);
        mutMB.setOrReplace("mA.aB", 911);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId3s, mutMB, jsonTables);
        _logger.info("------mA.mB->[926,927]");
        _logger.info("      mA.aB->{f1:921,f2:922}");
        alist = new ArrayList();
        alist.add(926);
        alist.add(927);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.mB", alist);
        mutMC.setOrReplace("mA.aB.fB1", 921);
        mutMC.setOrReplace("mA.aB.fB2", 922);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB->{fB1:931,fB2:932}");
        _logger.info("      mA.aB->[936,937]");
        alist = new ArrayList();
        alist.add(936);
        alist.add(937);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("mA.aB", alist);
        mutMC.setOrReplace("mA.mB.fB1", 931);
        mutMC.setOrReplace("mA.mB.fB2", 932);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------mA.mB->null");
        _logger.info("      mA.aB->null");
        mutMB = MapRDBImpl.newMutation();
        mutMB.delete("mA.mB");
        mutMB.delete("mA.aB");
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId3s, mutMB, jsonTables);
        _logger.info("------row1A4, row1B4, row2A4, row2B4, row3A4, row3B4, row4A4, row4B4");
        _logger.info("------f1->91, f2->null, f3->93");
        _logger.info("      a[1]->97, a[0]->null, a[2]->98");
        DocumentMutation mutM = MapRDBImpl.newMutation();
        mutM.setOrReplace("f1", 91);
        mutM.delete("f2");
        mutM.set("f3", 93);
        mutM.setOrReplace("a[1]", 97);
        mutM.delete("a[0]");
        mutM.set("a[2]", 98);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId4s, mutM, jsonTables);
        _logger.info("------f1->[96,97]");
        _logger.info("      a->{f1:91,f2:92}");
        alist = new ArrayList();
        alist.add(96);
        alist.add(97);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("f1", alist);
        mutMC.setOrReplace("a.f1", 91);
        mutMC.setOrReplace("a.f2", 92);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------a->[96,97]");
        _logger.info("      f1->{f1:91,f2:92}");
        alist = new ArrayList();
        alist.add(96);
        alist.add(97);
        mutMC = MapRDBImpl.newMutation();
        mutMC.setOrReplace("a", alist);
        mutMC.setOrReplace("f1.f1", 91);
        mutMC.setOrReplace("f1.f2", 92);
        TestCDPSOpenFormatJSONWithCluster.mutateRows(tableNames, srId1s, mutMC, jsonTables);
        _logger.info("------row->null");
        TestCDPSOpenFormatJSONWithCluster.deleteRows(srId4s, jsonTables);
        rCount = 152;
        scfGetList = TestCDPSUtil.fetchStringData(rCount, jscfConsumer);
        mcfGetList = TestCDPSUtil.fetchStringData(rCount, jmcfConsumer);
        for (int i = 0; i < rCount; ++i) {
            crec = scfGetList.get(i);
            cdrScf = (String)crec.value();
            docScf = MapRDB.newDocument((String)cdrScf);
            crec = mcfGetList.get(i);
            cdrMcf = (String)crec.value();
            docMcf = MapRDB.newDocument((String)cdrMcf);
            TestCDPSOpenFormatJSONWithCluster.compareRecords(docScf, docMcf);
        }
        jscfConsumer.close();
        jmcfConsumer.close();
        jscfTable1.close();
        jmcfTable1.close();
    }

    private static enum VerifyMethod {
        VerifySimpleMap,
        VerifyReplaceRowMap,
        VerifyInsertRowMap,
        VerifyReplaceCFMap,
        VerifyInsertCFMap,
        VerifyRowDelete,
        VerifyFieldDelete,
        VerifyReplaceSimpleArray,
        VerifyInsertSimpleArray,
        VerifyReplaceSimpleField,
        VerifyInsertSimpleField,
        VerifyArrayInArrayFieldL1,
        VerifyArrayInArrayFieldL2,
        VerifyArrayInArrayFieldL3,
        VerifyArrayInArrayFieldL4,
        VerifyArrayInArrayFieldL5,
        VerifyArrayInArrayFieldL6,
        VerifyArrayInArrayFieldM1,
        VerifyArrayInArrayFieldM2,
        VerifyArrayInArrayFieldM3,
        VerifyArrayInArrayFieldM4,
        VerifyArrayInArrayFieldM5,
        VerifyArrayInArrayFieldM6,
        VerifyArrayInArrayFieldN1,
        VerifyArrayInArrayFieldN2,
        VerifyArrayInArrayFieldN3,
        VerifyArrayInArrayFieldN4,
        VerifyMixedFieldO1,
        VerifyMixedFieldO2,
        VerifyMixedFieldO3,
        VerifyMixedFieldO4,
        VerifyMixedFieldO5;

    }

    private static enum PutMethod {
        InsertMap,
        ReplaceMap,
        InsertArray,
        ReplaceArray,
        InsertField,
        ReplaceField,
        DeleteField;

    }
}

