/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.FamilyDescriptor;
import com.mapr.db.Table;
import com.mapr.db.cdc.tests.JsonConsumer;
import com.mapr.db.cdc.tests.JsonLoader;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.tests.utils.DBTests;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.FieldPath;
import org.ojai.KeyValue;
import org.ojai.Value;
import org.ojai.store.DocumentMutation;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSCLIWithCluster
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSCLIWithCluster.class);
    private static AdminImpl testAdmin = null;

    public static ArrayList<String> CorrectSpaceSplit(String[] valToCorrect) {
        System.out.println("before correct: " + Arrays.toString(valToCorrect));
        ArrayList<String> correctedVals = new ArrayList<String>();
        int i = 0;
        int j = 0;
        while (i < valToCorrect.length) {
            if (valToCorrect[i].equals("|")) {
                Assert.assertTrue((i != 0 && i != valToCorrect.length - 1 ? 1 : 0) != 0);
                correctedVals.remove(j - 1);
                correctedVals.add(valToCorrect[i - 1] + "|" + valToCorrect[i - 1]);
                i += 2;
                ++j;
                continue;
            }
            correctedVals.add(valToCorrect[i]);
            ++i;
            ++j;
        }
        System.out.println("after correct: " + correctedVals.toString());
        return correctedVals;
    }

    public static void VerifyCmdOutput(String cmdretstr, String key, String[] colToVerify, String[] valToVerify, boolean verifyexist) {
        System.out.println(cmdretstr);
        if (key == null) {
            return;
        }
        String[] retlines = cmdretstr.split("\\n");
        int linenum = retlines.length;
        Assert.assertTrue((linenum >= 0 ? 1 : 0) != 0);
        int[] indexes = null;
        if (colToVerify != null && valToVerify != null) {
            int i;
            Assert.assertEquals((long)colToVerify.length, (long)valToVerify.length);
            Object[] linestr0 = retlines[0].split("\\s+");
            System.out.println("line0: " + Arrays.toString(linestr0));
            indexes = new int[colToVerify.length];
            for (i = 0; i < colToVerify.length; ++i) {
                indexes[i] = -1;
            }
            block1: for (i = 0; i < colToVerify.length; ++i) {
                for (int j = 0; j < linestr0.length; ++j) {
                    if (!((String)linestr0[j]).equals(colToVerify[i])) continue;
                    indexes[i] = j;
                    continue block1;
                }
            }
        }
        System.out.println("Indexes: " + Arrays.toString(indexes));
        boolean found = false;
        for (int ln = 1; ln < linenum; ++ln) {
            if (!retlines[ln].contains(key)) continue;
            if (colToVerify != null && valToVerify != null) {
                String[] linestrbyspace = retlines[ln].split("\\s+");
                ArrayList<String> linestr = TestCDPSCLIWithCluster.CorrectSpaceSplit(linestrbyspace);
                System.out.println("line " + ln + " len:" + linestr.size() + "items:" + linestr);
                for (int i = 0; i < valToVerify.length; ++i) {
                    int idx = indexes[i];
                    Assert.assertTrue((idx >= 0 ? 1 : 0) != 0);
                    String valA = valToVerify[i];
                    String valB = linestr.get(idx);
                    if (verifyexist) {
                        Assert.assertEquals((Object)valA, (Object)valB);
                        continue;
                    }
                    Assert.assertNotEquals((Object)valA, (Object)valB);
                }
            }
            found = true;
            break;
        }
        if (verifyexist) {
            Assert.assertTrue((boolean)found);
        } else {
            Assert.assertTrue((!found ? 1 : 0) != 0);
        }
    }

    public static void verifyInfoStreamTable(String tableName, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli stream info -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        TestCDPSCLIWithCluster.VerifyCmdOutput(retstr, tableName, colChglog, valChglog, true);
    }

    public static void verifyListTopicStreamTable(String tableName, String topicname, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli stream topic list -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        TestCDPSCLIWithCluster.VerifyCmdOutput(retstr, topicname, colChglog, valChglog, true);
    }

    public static void verifyTableCFList(String tableName, String cfpath, String[] colChglog, String[] valChglog, boolean verifyexist) throws Exception {
        String cmd = "maprcli table cf list -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        TestCDPSCLIWithCluster.VerifyCmdOutput(retstr, cfpath, colChglog, valChglog, verifyexist);
    }

    public static int editStreamTable(String tableName, String paraName, String paraVal) throws Exception {
        String cmd = "maprcli stream edit -path " + tableName + " " + paraName + " " + paraVal;
        System.out.println("\n" + cmd);
        return DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
    }

    public static void resumeCLG(String topicFullName, String srcTableName) throws Exception {
        String cmd = "maprcli table changelog resume -path " + srcTableName + " -changelog " + topicFullName;
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        System.out.println(retstr);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"false"};
        TestCDPSCLIWithCluster.verifyListCLG(srcTableName, topicFullName, colChglog, valChglog);
    }

    public static void pauseCLG(String topicFullName, String srcTableName) throws Exception {
        String cmd = "maprcli table changelog pause -path " + srcTableName + " -changelog " + topicFullName;
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        System.out.println(retstr);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"true"};
        TestCDPSCLIWithCluster.verifyListCLG(srcTableName, topicFullName, colChglog, valChglog);
    }

    public static void verifyInfoCLG(String topicFullName, String srcTableName, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli table changelog info -changelog " + topicFullName;
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        System.out.println(retstr);
        TestCDPSCLIWithCluster.VerifyCmdOutput(retstr, srcTableName, colChglog, valChglog, true);
    }

    public static void verifyListCLG(String tableName, String key, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli table changelog list -path " + tableName;
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        System.out.println(retstr);
        TestCDPSCLIWithCluster.VerifyCmdOutput(retstr, key, colChglog, valChglog, true);
    }

    @BeforeClass
    public static void startupBeforeClass() throws IOException {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 20 minutes, please wait ---\nTestCDPSCLIWithCluster#testCLGAddPause   ------ 10 minutes\nTestCDPSCLIWithCluster#testCLGAddColumns ------ 3 minutes\nTestCDPSCLIWithCluster#testStream        ------ 2 minutes\nTestCDPSCLIWithCluster#testStress1       ------ 3 minutes\nTestCDPSCLIWithCluster#testStress2       ------ 3 minutes\n");
        testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        testAdmin.close();
        System.out.println("Done!");
    }

    @Test
    public void testStream() throws Exception {
        String streamTable1 = "/tmp/streamTable1";
        String chgTable1 = "/tmp/chgTable1";
        TestCDPSUtil.replaceStreamTable(streamTable1, false, 3);
        TestCDPSUtil.replaceStreamTable(chgTable1, true, 3);
        String[] colChglog = new String[]{"ischangelog", "defaultpartitions"};
        String[] valChglog = new String[]{"false", "3"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(streamTable1, colChglog, valChglog);
        colChglog = new String[]{"produceperm", "ischangelog", "defaultpartitions"};
        valChglog = new String[]{"u:mapr", "true", "3"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgTable1, colChglog, valChglog);
        int retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-compression", "zlib");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-compression", "zlib");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-autocreate", "false");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-autocreate", "false");
        Assert.assertNotEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-defaultpartitions", "4");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-defaultpartitions", "4");
        Assert.assertNotEquals((long)retcode, (long)0L);
        colChglog = new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"};
        valChglog = new String[]{"false", "4", "zlib", "false"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(streamTable1, colChglog, valChglog);
        colChglog = new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"};
        valChglog = new String[]{"true", "3", "zlib", "true"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgTable1, colChglog, valChglog);
        String jsonTableName1 = "/tmp/jsonTable1";
        String chglogTopicFullName1 = chgTable1 + ":jsonTable1";
        String chglogTopicName1 = "jsonTable1";
        String streamTopicName1 = streamTable1 + ":jsonTable1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableName1);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, streamTable1, streamTopicName1);
        Assert.assertNotEquals((long)retcode, (long)0L);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)retcode, (long)0L);
        TestCDPSCLIWithCluster.verifyListCLG(jsonTableName1, chglogTopicFullName1, null, null);
        TestCDPSCLIWithCluster.verifyListTopicStreamTable(chgTable1, chglogTopicName1, null, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(chglogTopicFullName1, jsonTableName1, null, null);
    }

    public static void verifyColumnDataThroughIter1(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.SET);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, opTime, null, (Value)rec);
    }

    public static void verifyColumnDataThroughIter2(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug, ChangeOp[] cops) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[0], opTime, "a.b.c", rec.getValue("a.b.c"));
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[1], opTime, "g", rec.getValue("g"));
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[2], opTime, "x.y.z.w", rec.getValue("x.y.z.w"));
    }

    public static void verifyColumnDataThroughIter3(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug, String[] fieldnames) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        for (int i = 0; i < fieldnames.length; ++i) {
            TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, opTime, fieldnames[i], rec.getValue(fieldnames[i]));
        }
    }

    @Test
    public void testCLGAddPause() throws Exception {
        ArrayList presetValuesScf = new ArrayList();
        String jsrc1 = "/tmp/jsrc1";
        String chglogdst1 = "/tmp/chglogdst1";
        String topicFullName = chglogdst1 + ":jsrc1";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        Table jsonTable = MapRDBImpl.getTable((String)jsrc1);
        Object rowid = null;
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("dummyf1", 10).set("dummyf2", 20);
        for (int i = 0; i < 20; ++i) {
            jsonTable.insertOrReplace("row" + i, (Document)rec);
            jsonTable.flush();
        }
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, true, null);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"true"};
        TestCDPSCLIWithCluster.verifyListCLG(jsrc1, topicFullName, colChglog, valChglog);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        Object crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        Object cdr = null;
        Object putrec = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(20, consumerScf, 30);
        Assert.assertEquals((long)0L, (long)getListScf.size());
        TestCDPSCLIWithCluster.resumeCLG(topicFullName, jsrc1);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(20, consumerScf, 30);
        Assert.assertEquals((long)20L, (long)getListScf.size());
        TestCDPSCLIWithCluster.pauseCLG(topicFullName, jsrc1);
        for (int i = 0; i < 10; ++i) {
            jsonTable.insertOrReplace("row" + i, (Document)rec);
            jsonTable.flush();
        }
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(10, consumerScf, 30);
        Assert.assertEquals((long)0L, (long)getListScf.size());
        TestCDPSCLIWithCluster.resumeCLG(topicFullName, jsrc1);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(10, consumerScf, 30);
        Assert.assertEquals((long)10L, (long)getListScf.size());
        jsonTable.close();
    }

    @Test
    public void testCLGAddColumns() throws Exception {
        ArrayList<DBDocumentImpl> presetValuesScf = new ArrayList<DBDocumentImpl>();
        String jsrc1 = "/tmp/jcolsrc1";
        String chglogdst1 = "/tmp/chglogcoldst1";
        String topicFullName = chglogdst1 + ":jcolsrc1";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        String rowid = "row1";
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("a.e.f1", "aev1").set("g.f1", "gv1").set("h.f1", "gv1").set("c.d.f1", "cdv1").set("x.y.z.w.f1", "xyzwv1").set("x.y.z.f1", "xyzv1");
        Table jsonTable = MapRDBImpl.getTable((String)jsrc1);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("g.f1", "gv1").set("x.y.z.w.f1", "xyzwv1");
        presetValuesScf.add(rec);
        rowid = "row2";
        rec = new DBDocumentImpl();
        rec.set("m.n.f2", "mnv2").set("p.q.r.f2", "pqrv2").set("h.f2", "pqrv2");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rowid = "row3";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f3", "abcv3").set("g.f3", "gv3");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        presetValuesScf.add(rec);
        String replPaths = "a.b.c,g,x.y.z.w";
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, replPaths);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        String[] colChglog = new String[]{"Columns"};
        String[] valChglog = new String[]{"a.b.c,g,x.y.z.w"};
        TestCDPSCLIWithCluster.verifyListCLG(jsrc1, chglogdst1, colChglog, valChglog);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        ChangeDataRecord cdr = null;
        Document putrec = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeData(2, consumerScf);
        int putId = 0;
        boolean isDebug = true;
        crec = getListScf.get(putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1("row1", cdr, putrec, isDebug);
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1("row3", cdr, putrec, isDebug);
        rowid = "row4";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("a.e.f4", "aev4").set("g.f4", "gv4").set("h.f4", "gv4").set("c.d.f4", "cdv4").set("x.y.z.w.f4", "xyzwv4").set("x.y.z.f4", "xyzv4");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("g.f4", "gv4").set("x.y.z.w.f4", "xyzwv4");
        presetValuesScf.add(rec);
        rowid = "row5";
        rec = new DBDocumentImpl();
        rec.set("m.n.f5", "mnv5").set("p.q.r.f5", "pqrv5").set("h.f5", "pqrv5");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.setNull("a.b.c").setNull("g").setNull("x.y.z.w");
        presetValuesScf.add(rec);
        rowid = "row6";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f6", "abcv6").set("x.y.z.w.f6", "xyzwv6");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f6", "abcv6").setNull("g").set("x.y.z.w.f6", "xyzwv6");
        presetValuesScf.add(rec);
        DocumentMutation mutation = null;
        rowid = "row4";
        mutation = MapRDBImpl.newMutation();
        mutation.set("a.b.c.f4A", "abcv4A").set("c.d.f4A", "c.dv4A");
        jsonTable.update(rowid, mutation);
        jsonTable.flush();
        rec = new DBDocumentImpl();
        rec.empty();
        rec.set("a.b.c.f4A", "abcv4A");
        presetValuesScf.add(rec);
        rowid = "row5";
        mutation = MapRDBImpl.newMutation();
        mutation.set("w.x.f5A", "wxv5A").set("map1.f5A", "map1v5A").set("apple.f5A", "applev5A");
        jsonTable.update(rowid, mutation);
        jsonTable.flush();
        rowid = "row6";
        mutation = MapRDBImpl.newMutation();
        mutation.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        jsonTable.update(rowid, mutation);
        jsonTable.flush();
        rec = new DBDocumentImpl();
        rec.empty();
        rec.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        presetValuesScf.add(rec);
        for (int i = 0; i < presetValuesScf.size(); ++i) {
            System.out.println("input rec" + i + ": " + presetValuesScf.get(i));
        }
        getListScf = TestCDPSUtil.fetchChangeData(5, consumerScf);
        putId = 0;
        crec = getListScf.get(putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row4", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.SET, ChangeOp.SET, ChangeOp.SET});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row5", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.DELETE, ChangeOp.DELETE, ChangeOp.DELETE});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row6", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.SET, ChangeOp.DELETE, ChangeOp.SET});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter3("row4", cdr, putrec, isDebug, new String[]{"a.b.c.f4A"});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter3("row6", cdr, putrec, isDebug, new String[]{"g.map.f6A", "x.y.z.w.f6A"});
        jsonTable.close();
    }

    @Test
    public void testCLGChangeColumns() throws Exception {
        ArrayList presetValuesScf = new ArrayList();
        String jsrc1 = "/tmp/jcolsrc2";
        String chglogdst1 = "/tmp/chglogcoldst2";
        String topicFullName = chglogdst1 + ":jcolsrc2";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        HashMap<String, String> cfPath = new HashMap<String, String>();
        cfPath.put("cf2", "c.d");
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsrc1, cfPath);
        String rowid = "row1";
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("a.b.m1.data1", "abmd1").set("c.d.m1.int1", 101);
        Table jsonTable = MapRDBImpl.getTable((String)jsrc1);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        System.out.println("input " + rowid + ": " + rec);
        boolean isDebug = true;
        ChangeDataRecord cdr = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        FieldPath P_CF3 = FieldPath.parseFrom((String)"e.f");
        FamilyDescriptor familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_CF3).setCompression(FamilyDescriptor.Compression.None);
        Path jsrcPath = new Path(jsrc1);
        testAdmin.addFamily(jsrcPath, familyDesc);
        rowid = "row2";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data2", "abm1d2").set("c.d.m1.int2", 102).set("e.f.str2", "efs2");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        System.out.println("input " + rowid + ": " + rec);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        System.out.println("--- Passed the CF add test ---");
        String[] colChglog = new String[]{"cfname"};
        String[] valChglog = new String[]{"cf2"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "c.d", colChglog, valChglog, true);
        FieldPath P_CF2 = FieldPath.parseFrom((String)"c.d");
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf22").setJsonFieldPath(P_CF2).setCompression(FamilyDescriptor.Compression.None);
        testAdmin.alterFamily(jsrcPath, "cf2", familyDesc);
        colChglog = new String[]{"cfname"};
        valChglog = new String[]{"cf22"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "c.d", colChglog, valChglog, true);
        rowid = "row3";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data3", "abm1d3").set("c.d.m1.int3", 103).set("e.f.str3", "efs3");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        System.out.println("input " + rowid + ": " + rec);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        System.out.println("--- Passed the CF add test ---");
        rowid = "row4";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data4", "abm1d4").set("c.d.m1.int4", 104).set("e.f.str4", "efs4");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        System.out.println("input " + rowid + ": " + rec);
        String rowid2 = "row5";
        DBDocumentImpl rec2 = new DBDocumentImpl();
        rec2.set("a.b.m1.data5", "abm1d5").set("c.d.m1.int5", 105);
        jsonTable.insertOrReplace(rowid2, (Document)rec2);
        jsonTable.flush();
        System.out.println("input " + rowid2 + ": " + rec2);
        colChglog = new String[]{"cfname"};
        valChglog = new String[]{"cf3"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "e.f", colChglog, valChglog, true);
        testAdmin.deleteFamily(jsrcPath, "cf3");
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "e.f", colChglog, valChglog, false);
        getListScf = TestCDPSUtil.fetchChangeData(2, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSUtil.printCDRec(cdr);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        crec = getListScf.get(1);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSUtil.printCDRec(cdr);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid2, cdr, (Document)rec2, isDebug);
        jsonTable.close();
    }

    @Test
    public void testStress1() throws Exception {
        String rowid;
        int i;
        int RowCount = 1000;
        String jsrc1 = "/tmp/jstresssrc1";
        String chglogdst1 = "/tmp/chglogstressdst1";
        String topicFullName = chglogdst1 + ":jstresssrc1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        Table jsonTable = MapRDBImpl.getTable((String)jsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        boolean isDebug = false;
        ChangeDataRecord cdr = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        DBDocumentImpl rec = new DBDocumentImpl();
        for (i = 0; i < RowCount; ++i) {
            rowid = "row" + i;
            rec = new DBDocumentImpl();
            rec.set("a.b.m1.data", "abm1d" + i).set("c.d.m1.int", i).set("e.f.str", "efs" + i);
            jsonTable.insertOrReplace(rowid, (Document)rec);
        }
        getListScf = TestCDPSUtil.fetchChangeData(RowCount, consumerScf);
        for (i = 0; i < RowCount; ++i) {
            crec = getListScf.get(i);
            cdr = (ChangeDataRecord)crec.value();
            rowid = cdr.getId().getString();
            String idx = rowid.substring("row".length());
            rec = new DBDocumentImpl();
            rec.set("a.b.m1.data", "abm1d" + idx).set("c.d.m1.int", Integer.parseInt(idx)).set("e.f.str", "efs" + idx);
            TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        }
        jsonTable.close();
    }

    @Test
    public void testStress2() throws Exception {
        int RowCount = 2000;
        String jsrc1 = "/tmp/jstresssrc2";
        String chglogdst1 = "/tmp/chglogstressdst2";
        String topicFullName = chglogdst1 + ":jstresssrc2";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        Table jsonTable = MapRDBImpl.getTable((String)jsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        JsonConsumer jconsumer = new JsonConsumer(topicFullName, RowCount);
        JsonLoader jloader21 = new JsonLoader(0, 500, jsonTable);
        JsonLoader jloader22 = new JsonLoader(500, 1000, jsonTable);
        JsonLoader jloader23 = new JsonLoader(1000, 1500, jsonTable);
        JsonLoader jloader24 = new JsonLoader(1500, 2000, jsonTable);
        Thread thread1 = new Thread((Runnable)jconsumer, "consumer1");
        Thread thread21 = new Thread((Runnable)jloader21, "loader21");
        Thread thread22 = new Thread((Runnable)jloader22, "loader22");
        Thread thread23 = new Thread((Runnable)jloader23, "loader23");
        Thread thread24 = new Thread((Runnable)jloader24, "loader24");
        thread1.start();
        thread21.start();
        thread22.start();
        thread23.start();
        thread24.start();
        thread21.join();
        thread22.join();
        thread23.join();
        thread24.join();
        thread1.join();
        jsonTable.close();
    }
}

