package com.mapr.db.cdc.tests;

import com.mapr.db.FamilyDescriptor;
import com.mapr.db.Table;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.tests.utils.DBTests;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.FieldPath;
import org.ojai.KeyValue;
import org.ojai.Value;
import org.ojai.store.DocumentMutation;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/db/cdc/tests/TestCDPSCLIWithCluster.class */
public class TestCDPSCLIWithCluster extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSCLIWithCluster.class);
    private static AdminImpl testAdmin = null;

    public static ArrayList<String> CorrectSpaceSplit(String[] strArr) {
        System.out.println("before correct: " + Arrays.toString(strArr));
        ArrayList<String> arrayList = new ArrayList<>();
        int i = 0;
        int i2 = 0;
        while (i < strArr.length) {
            if (strArr[i].equals("|")) {
                Assert.assertTrue((i == 0 || i == strArr.length - 1) ? false : true);
                arrayList.remove(i2 - 1);
                arrayList.add(strArr[i - 1] + "|" + strArr[i - 1]);
                i += 2;
                i2++;
            } else {
                arrayList.add(strArr[i]);
                i++;
                i2++;
            }
        }
        System.out.println("after correct: " + arrayList.toString());
        return arrayList;
    }

    public static void VerifyCmdOutput(String str, String str2, String[] strArr, String[] strArr2, boolean z) {
        System.out.println(str);
        if (str2 == null) {
            return;
        }
        String[] split = str.split("\\n");
        int length = split.length;
        Assert.assertTrue(length >= 0);
        int[] iArr = null;
        if (strArr != null && strArr2 != null) {
            Assert.assertEquals(strArr.length, strArr2.length);
            String[] split2 = split[0].split("\\s+");
            System.out.println("line0: " + Arrays.toString(split2));
            iArr = new int[strArr.length];
            for (int i = 0; i < strArr.length; i++) {
                iArr[i] = -1;
            }
            for (int i2 = 0; i2 < strArr.length; i2++) {
                int i3 = 0;
                while (true) {
                    if (i3 >= split2.length) {
                        break;
                    }
                    if (split2[i3].equals(strArr[i2])) {
                        iArr[i2] = i3;
                        break;
                    }
                    i3++;
                }
            }
        }
        System.out.println("Indexes: " + Arrays.toString(iArr));
        boolean z2 = false;
        int i4 = 1;
        while (true) {
            if (i4 >= length) {
                break;
            }
            if (split[i4].contains(str2)) {
                if (strArr != null && strArr2 != null) {
                    ArrayList<String> CorrectSpaceSplit = CorrectSpaceSplit(split[i4].split("\\s+"));
                    System.out.println("line " + i4 + " len:" + CorrectSpaceSplit.size() + "items:" + CorrectSpaceSplit);
                    for (int i5 = 0; i5 < strArr2.length; i5++) {
                        int i6 = iArr[i5];
                        Assert.assertTrue(i6 >= 0);
                        String str3 = strArr2[i5];
                        String str4 = CorrectSpaceSplit.get(i6);
                        if (z) {
                            Assert.assertEquals(str3, str4);
                        } else {
                            Assert.assertNotEquals(str3, str4);
                        }
                    }
                }
                z2 = true;
            } else {
                i4++;
            }
        }
        if (z) {
            Assert.assertTrue(z2);
        } else {
            Assert.assertTrue(!z2);
        }
    }

    public static void verifyInfoStreamTable(String str, String[] strArr, String[] strArr2) throws Exception {
        String str2 = "maprcli stream info -path " + str;
        System.out.println("\n" + str2);
        VerifyCmdOutput(DBTests.ExecuteShellCmd(str2), str, strArr, strArr2, true);
    }

    public static void verifyListTopicStreamTable(String str, String str2, String[] strArr, String[] strArr2) throws Exception {
        String str3 = "maprcli stream topic list -path " + str;
        System.out.println("\n" + str3);
        VerifyCmdOutput(DBTests.ExecuteShellCmd(str3), str2, strArr, strArr2, true);
    }

    public static void verifyTableCFList(String str, String str2, String[] strArr, String[] strArr2, boolean z) throws Exception {
        String str3 = "maprcli table cf list -path " + str;
        System.out.println("\n" + str3);
        VerifyCmdOutput(DBTests.ExecuteShellCmd(str3), str2, strArr, strArr2, z);
    }

    public static int editStreamTable(String str, String str2, String str3) throws Exception {
        String str4 = "maprcli stream edit -path " + str + " " + str2 + " " + str3;
        System.out.println("\n" + str4);
        return DBTests.ExecuteShellCmdAndGetReturnCode(str4);
    }

    public static void resumeCLG(String str, String str2) throws Exception {
        System.out.println(DBTests.ExecuteShellCmd("maprcli table changelog resume -path " + str2 + " -changelog " + str));
        verifyListCLG(str2, str, new String[]{"paused"}, new String[]{"false"});
    }

    public static void pauseCLG(String str, String str2) throws Exception {
        System.out.println(DBTests.ExecuteShellCmd("maprcli table changelog pause -path " + str2 + " -changelog " + str));
        verifyListCLG(str2, str, new String[]{"paused"}, new String[]{"true"});
    }

    public static void verifyInfoCLG(String str, String str2, String[] strArr, String[] strArr2) throws Exception {
        String ExecuteShellCmd = DBTests.ExecuteShellCmd("maprcli table changelog info -changelog " + str);
        System.out.println(ExecuteShellCmd);
        VerifyCmdOutput(ExecuteShellCmd, str2, strArr, strArr2, true);
    }

    public static void verifyListCLG(String str, String str2, String[] strArr, String[] strArr2) throws Exception {
        String ExecuteShellCmd = DBTests.ExecuteShellCmd("maprcli table changelog list -path " + str);
        System.out.println(ExecuteShellCmd);
        VerifyCmdOutput(ExecuteShellCmd, str2, strArr, strArr2, true);
    }

    @BeforeClass
    public static void startupBeforeClass() throws IOException {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 20 minutes, please wait ---\nTestCDPSCLIWithCluster#testCLGAddPause   ------ 10 minutes\nTestCDPSCLIWithCluster#testCLGAddColumns ------ 3 minutes\nTestCDPSCLIWithCluster#testStream        ------ 2 minutes\nTestCDPSCLIWithCluster#testStress1       ------ 3 minutes\nTestCDPSCLIWithCluster#testStress2       ------ 3 minutes\n");
        testAdmin = MapRDBImpl.newAdmin();
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        testAdmin.close();
        System.out.println("Done!");
    }

    @Test
    public void testStream() throws Exception {
        TestCDPSUtil.replaceStreamTable("/tmp/streamTable1", false, 3);
        TestCDPSUtil.replaceStreamTable("/tmp/chgTable1", true, 3);
        verifyInfoStreamTable("/tmp/streamTable1", new String[]{"ischangelog", "defaultpartitions"}, new String[]{"false", "3"});
        verifyInfoStreamTable("/tmp/chgTable1", new String[]{"produceperm", "ischangelog", "defaultpartitions"}, new String[]{"u:mapr", "true", "3"});
        Assert.assertEquals(editStreamTable("/tmp/streamTable1", "-compression", "zlib"), 0L);
        Assert.assertEquals(editStreamTable("/tmp/chgTable1", "-compression", "zlib"), 0L);
        Assert.assertEquals(editStreamTable("/tmp/streamTable1", "-autocreate", "false"), 0L);
        Assert.assertNotEquals(editStreamTable("/tmp/chgTable1", "-autocreate", "false"), 0L);
        Assert.assertEquals(editStreamTable("/tmp/streamTable1", "-defaultpartitions", "4"), 0L);
        Assert.assertNotEquals(editStreamTable("/tmp/chgTable1", "-defaultpartitions", "4"), 0L);
        verifyInfoStreamTable("/tmp/streamTable1", new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"}, new String[]{"false", "4", "zlib", "false"});
        verifyInfoStreamTable("/tmp/chgTable1", new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"}, new String[]{"true", "3", "zlib", "true"});
        String str = "/tmp/chgTable1:jsonTable1";
        String str2 = "/tmp/streamTable1:jsonTable1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, "/tmp/jsonTable1");
        Assert.assertNotEquals(TestCDPSUtil.setupCDPSReplicaReturnCode("/tmp/jsonTable1", "/tmp/streamTable1", str2), 0L);
        Assert.assertEquals(TestCDPSUtil.setupCDPSReplicaReturnCode("/tmp/jsonTable1", "/tmp/chgTable1", str), 0L);
        verifyListCLG("/tmp/jsonTable1", str, null, null);
        verifyListTopicStreamTable("/tmp/chgTable1", "jsonTable1", null, null);
        verifyInfoCLG(str, "/tmp/jsonTable1", null, null);
    }

    public static void verifyColumnDataThroughIter1(String str, ChangeDataRecord changeDataRecord, Document document, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        System.out.println("-----input rec " + document);
        TestCDPSUtil.verifyHead(changeDataRecord, true, str, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.SET);
        TestCDPSUtil.assertNodeEquals((ChangeNode) ((KeyValue) changeDataRecord.iterator().next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, changeDataRecord.getOpTimestamp(), null, (Value) document);
    }

    public static void verifyColumnDataThroughIter2(String str, ChangeDataRecord changeDataRecord, Document document, boolean z, ChangeOp[] changeOpArr) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        System.out.println("-----input rec " + document);
        TestCDPSUtil.verifyHead(changeDataRecord, true, str, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTimestamp = changeDataRecord.getOpTimestamp();
        Iterator it = changeDataRecord.iterator();
        TestCDPSUtil.assertNodeEquals((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOpArr[0], opTimestamp, "a.b.c", document.getValue("a.b.c"));
        TestCDPSUtil.assertNodeEquals((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOpArr[1], opTimestamp, "g", document.getValue("g"));
        TestCDPSUtil.assertNodeEquals((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOpArr[2], opTimestamp, "x.y.z.w", document.getValue("x.y.z.w"));
    }

    public static void verifyColumnDataThroughIter3(String str, ChangeDataRecord changeDataRecord, Document document, boolean z, String[] strArr) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        System.out.println("-----input rec " + document);
        TestCDPSUtil.verifyHead(changeDataRecord, true, str, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTimestamp = changeDataRecord.getOpTimestamp();
        Iterator it = changeDataRecord.iterator();
        for (int i = 0; i < strArr.length; i++) {
            TestCDPSUtil.assertNodeEquals((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, opTimestamp, strArr[i], document.getValue(strArr[i]));
        }
    }

    @Test
    public void testCLGAddPause() throws Exception {
        new ArrayList();
        String str = "/tmp/chglogdst1:jsrc1";
        TestCDPSUtil.replaceStreamTable("/tmp/chglogdst1", true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, "/tmp/jsrc1");
        Table table = MapRDBImpl.getTable("/tmp/jsrc1");
        DBDocumentImpl dBDocumentImpl = new DBDocumentImpl();
        dBDocumentImpl.set("dummyf1", 10).set("dummyf2", 20);
        for (int i = 0; i < 20; i++) {
            table.insertOrReplace("row" + i, dBDocumentImpl);
            table.flush();
        }
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/jsrc1", "/tmp/chglogdst1", str, true, null);
        verifyListCLG("/tmp/jsrc1", str, new String[]{"paused"}, new String[]{"true"});
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(str);
        Assert.assertEquals(0L, TestCDPSUtil.fetchChangeDataWithBreak(20, startConsumer, 30).size());
        resumeCLG(str, "/tmp/jsrc1");
        Assert.assertEquals(20L, TestCDPSUtil.fetchChangeDataWithBreak(20, startConsumer, 30).size());
        pauseCLG(str, "/tmp/jsrc1");
        for (int i2 = 0; i2 < 10; i2++) {
            table.insertOrReplace("row" + i2, dBDocumentImpl);
            table.flush();
        }
        Assert.assertEquals(0L, TestCDPSUtil.fetchChangeDataWithBreak(10, startConsumer, 30).size());
        resumeCLG(str, "/tmp/jsrc1");
        Assert.assertEquals(10L, TestCDPSUtil.fetchChangeDataWithBreak(10, startConsumer, 30).size());
        table.close();
    }

    @Test
    public void testCLGAddColumns() throws Exception {
        ArrayList arrayList = new ArrayList();
        String str = "/tmp/chglogcoldst1:jcolsrc1";
        TestCDPSUtil.replaceStreamTable("/tmp/chglogcoldst1", true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, "/tmp/jcolsrc1");
        DBDocumentImpl dBDocumentImpl = new DBDocumentImpl();
        dBDocumentImpl.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("a.e.f1", "aev1").set("g.f1", "gv1").set("h.f1", "gv1").set("c.d.f1", "cdv1").set("x.y.z.w.f1", "xyzwv1").set("x.y.z.f1", "xyzv1");
        Table table = MapRDBImpl.getTable("/tmp/jcolsrc1");
        table.insertOrReplace("row1", dBDocumentImpl);
        table.flush();
        dBDocumentImpl.empty();
        dBDocumentImpl.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("g.f1", "gv1").set("x.y.z.w.f1", "xyzwv1");
        arrayList.add(dBDocumentImpl);
        DBDocumentImpl dBDocumentImpl2 = new DBDocumentImpl();
        dBDocumentImpl2.set("m.n.f2", "mnv2").set("p.q.r.f2", "pqrv2").set("h.f2", "pqrv2");
        table.insertOrReplace("row2", dBDocumentImpl2);
        table.flush();
        DBDocumentImpl dBDocumentImpl3 = new DBDocumentImpl();
        dBDocumentImpl3.set("a.b.c.f3", "abcv3").set("g.f3", "gv3");
        table.insertOrReplace("row3", dBDocumentImpl3);
        table.flush();
        arrayList.add(dBDocumentImpl3);
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/jcolsrc1", "/tmp/chglogcoldst1", str, false, "a.b.c,g,x.y.z.w");
        verifyInfoCLG(str, "/tmp/jcolsrc1", null, null);
        verifyListCLG("/tmp/jcolsrc1", "/tmp/chglogcoldst1", new String[]{"Columns"}, new String[]{"a.b.c,g,x.y.z.w"});
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(str);
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData = TestCDPSUtil.fetchChangeData(2, startConsumer);
        verifyColumnDataThroughIter1("row1", (ChangeDataRecord) fetchChangeData.get(0).value(), (Document) arrayList.get(0), true);
        int i = 0 + 1;
        verifyColumnDataThroughIter1("row3", (ChangeDataRecord) fetchChangeData.get(i).value(), (Document) arrayList.get(i), true);
        DBDocumentImpl dBDocumentImpl4 = new DBDocumentImpl();
        dBDocumentImpl4.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("a.e.f4", "aev4").set("g.f4", "gv4").set("h.f4", "gv4").set("c.d.f4", "cdv4").set("x.y.z.w.f4", "xyzwv4").set("x.y.z.f4", "xyzv4");
        table.insertOrReplace("row4", dBDocumentImpl4);
        table.flush();
        dBDocumentImpl4.empty();
        dBDocumentImpl4.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("g.f4", "gv4").set("x.y.z.w.f4", "xyzwv4");
        arrayList.add(dBDocumentImpl4);
        DBDocumentImpl dBDocumentImpl5 = new DBDocumentImpl();
        dBDocumentImpl5.set("m.n.f5", "mnv5").set("p.q.r.f5", "pqrv5").set("h.f5", "pqrv5");
        table.insertOrReplace("row5", dBDocumentImpl5);
        table.flush();
        dBDocumentImpl5.empty();
        dBDocumentImpl5.setNull("a.b.c").setNull("g").setNull("x.y.z.w");
        arrayList.add(dBDocumentImpl5);
        DBDocumentImpl dBDocumentImpl6 = new DBDocumentImpl();
        dBDocumentImpl6.set("a.b.c.f6", "abcv6").set("x.y.z.w.f6", "xyzwv6");
        table.insertOrReplace("row6", dBDocumentImpl6);
        table.flush();
        dBDocumentImpl6.empty();
        dBDocumentImpl6.set("a.b.c.f6", "abcv6").setNull("g").set("x.y.z.w.f6", "xyzwv6");
        arrayList.add(dBDocumentImpl6);
        DocumentMutation newMutation = MapRDBImpl.newMutation();
        newMutation.set("a.b.c.f4A", "abcv4A").set("c.d.f4A", "c.dv4A");
        table.update("row4", newMutation);
        table.flush();
        DBDocumentImpl dBDocumentImpl7 = new DBDocumentImpl();
        dBDocumentImpl7.empty();
        dBDocumentImpl7.set("a.b.c.f4A", "abcv4A");
        arrayList.add(dBDocumentImpl7);
        DocumentMutation newMutation2 = MapRDBImpl.newMutation();
        newMutation2.set("w.x.f5A", "wxv5A").set("map1.f5A", "map1v5A").set("apple.f5A", "applev5A");
        table.update("row5", newMutation2);
        table.flush();
        DocumentMutation newMutation3 = MapRDBImpl.newMutation();
        newMutation3.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        table.update("row6", newMutation3);
        table.flush();
        DBDocumentImpl dBDocumentImpl8 = new DBDocumentImpl();
        dBDocumentImpl8.empty();
        dBDocumentImpl8.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        arrayList.add(dBDocumentImpl8);
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            System.out.println("input rec" + i2 + ": " + arrayList.get(i2));
        }
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData2 = TestCDPSUtil.fetchChangeData(5, startConsumer);
        verifyColumnDataThroughIter2("row4", (ChangeDataRecord) fetchChangeData2.get(0).value(), (Document) arrayList.get(0 + 2), true, new ChangeOp[]{ChangeOp.SET, ChangeOp.SET, ChangeOp.SET});
        int i3 = 0 + 1;
        verifyColumnDataThroughIter2("row5", (ChangeDataRecord) fetchChangeData2.get(i3).value(), (Document) arrayList.get(i3 + 2), true, new ChangeOp[]{ChangeOp.DELETE, ChangeOp.DELETE, ChangeOp.DELETE});
        int i4 = i3 + 1;
        verifyColumnDataThroughIter2("row6", (ChangeDataRecord) fetchChangeData2.get(i4).value(), (Document) arrayList.get(i4 + 2), true, new ChangeOp[]{ChangeOp.SET, ChangeOp.DELETE, ChangeOp.SET});
        int i5 = i4 + 1;
        verifyColumnDataThroughIter3("row4", (ChangeDataRecord) fetchChangeData2.get(i5).value(), (Document) arrayList.get(i5 + 2), true, new String[]{"a.b.c.f4A"});
        int i6 = i5 + 1;
        verifyColumnDataThroughIter3("row6", (ChangeDataRecord) fetchChangeData2.get(i6).value(), (Document) arrayList.get(i6 + 2), true, new String[]{"g.map.f6A", "x.y.z.w.f6A"});
        table.close();
    }

    @Test
    public void testCLGChangeColumns() throws Exception {
        new ArrayList();
        String str = "/tmp/chglogcoldst2:jcolsrc2";
        TestCDPSUtil.replaceStreamTable("/tmp/chglogcoldst2", true, 1);
        HashMap hashMap = new HashMap();
        hashMap.put("cf2", "c.d");
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, "/tmp/jcolsrc2", hashMap);
        DBDocumentImpl dBDocumentImpl = new DBDocumentImpl();
        dBDocumentImpl.set("a.b.m1.data1", "abmd1").set("c.d.m1.int1", 101);
        Table table = MapRDBImpl.getTable("/tmp/jcolsrc2");
        table.insertOrReplace("row1", dBDocumentImpl);
        table.flush();
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/jcolsrc2", "/tmp/chglogcoldst2", str, false, null);
        verifyInfoCLG(str, "/tmp/jcolsrc2", null, null);
        System.out.println("input row1: " + dBDocumentImpl);
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(str);
        verifyColumnDataThroughIter1("row1", (ChangeDataRecord) TestCDPSUtil.fetchChangeData(1, startConsumer).get(0).value(), dBDocumentImpl, true);
        FamilyDescriptor compression = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(FieldPath.parseFrom("e.f")).setCompression(FamilyDescriptor.Compression.None);
        Path path = new Path("/tmp/jcolsrc2");
        testAdmin.addFamily(path, compression);
        DBDocumentImpl dBDocumentImpl2 = new DBDocumentImpl();
        dBDocumentImpl2.set("a.b.m1.data2", "abm1d2").set("c.d.m1.int2", 102).set("e.f.str2", "efs2");
        table.insertOrReplace("row2", dBDocumentImpl2);
        table.flush();
        System.out.println("input row2: " + dBDocumentImpl2);
        verifyColumnDataThroughIter1("row2", (ChangeDataRecord) TestCDPSUtil.fetchChangeData(1, startConsumer).get(0).value(), dBDocumentImpl2, true);
        System.out.println("--- Passed the CF add test ---");
        verifyTableCFList("/tmp/jcolsrc2", "c.d", new String[]{"cfname"}, new String[]{"cf2"}, true);
        testAdmin.alterFamily(path, "cf2", MapRDBImpl.newFamilyDescriptor().setName("cf22").setJsonFieldPath(FieldPath.parseFrom("c.d")).setCompression(FamilyDescriptor.Compression.None));
        verifyTableCFList("/tmp/jcolsrc2", "c.d", new String[]{"cfname"}, new String[]{"cf22"}, true);
        DBDocumentImpl dBDocumentImpl3 = new DBDocumentImpl();
        dBDocumentImpl3.set("a.b.m1.data3", "abm1d3").set("c.d.m1.int3", 103).set("e.f.str3", "efs3");
        table.insertOrReplace("row3", dBDocumentImpl3);
        table.flush();
        System.out.println("input row3: " + dBDocumentImpl3);
        verifyColumnDataThroughIter1("row3", (ChangeDataRecord) TestCDPSUtil.fetchChangeData(1, startConsumer).get(0).value(), dBDocumentImpl3, true);
        System.out.println("--- Passed the CF add test ---");
        DBDocumentImpl dBDocumentImpl4 = new DBDocumentImpl();
        dBDocumentImpl4.set("a.b.m1.data4", "abm1d4").set("c.d.m1.int4", 104).set("e.f.str4", "efs4");
        table.insertOrReplace("row4", dBDocumentImpl4);
        System.out.println("input row4: " + dBDocumentImpl4);
        DBDocumentImpl dBDocumentImpl5 = new DBDocumentImpl();
        dBDocumentImpl5.set("a.b.m1.data5", "abm1d5").set("c.d.m1.int5", 105);
        table.insertOrReplace("row5", dBDocumentImpl5);
        table.flush();
        System.out.println("input row5: " + dBDocumentImpl5);
        String[] strArr = {"cfname"};
        String[] strArr2 = {"cf3"};
        verifyTableCFList("/tmp/jcolsrc2", "e.f", strArr, strArr2, true);
        testAdmin.deleteFamily(path, "cf3");
        verifyTableCFList("/tmp/jcolsrc2", "e.f", strArr, strArr2, false);
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData = TestCDPSUtil.fetchChangeData(2, startConsumer);
        ChangeDataRecord changeDataRecord = (ChangeDataRecord) fetchChangeData.get(0).value();
        TestCDPSUtil.printCDRec(changeDataRecord);
        verifyColumnDataThroughIter1("row4", changeDataRecord, dBDocumentImpl4, true);
        ChangeDataRecord changeDataRecord2 = (ChangeDataRecord) fetchChangeData.get(1).value();
        TestCDPSUtil.printCDRec(changeDataRecord2);
        verifyColumnDataThroughIter1("row5", changeDataRecord2, dBDocumentImpl5, true);
        table.close();
    }

    @Test
    public void testStress1() throws Exception {
        String str = "/tmp/chglogstressdst1:jstresssrc1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, "/tmp/jstresssrc1");
        Table table = MapRDBImpl.getTable("/tmp/jstresssrc1");
        TestCDPSUtil.replaceStreamTable("/tmp/chglogstressdst1", true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/jstresssrc1", "/tmp/chglogstressdst1", str, false, null);
        verifyInfoCLG(str, "/tmp/jstresssrc1", null, null);
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(str);
        new DBDocumentImpl();
        for (int i = 0; i < 1000; i++) {
            DBDocumentImpl dBDocumentImpl = new DBDocumentImpl();
            dBDocumentImpl.set("a.b.m1.data", "abm1d" + i).set("c.d.m1.int", i).set("e.f.str", "efs" + i);
            table.insertOrReplace("row" + i, dBDocumentImpl);
        }
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData = TestCDPSUtil.fetchChangeData(1000, startConsumer);
        for (int i2 = 0; i2 < 1000; i2++) {
            ChangeDataRecord changeDataRecord = (ChangeDataRecord) fetchChangeData.get(i2).value();
            String string = changeDataRecord.getId().getString();
            String substring = string.substring("row".length());
            DBDocumentImpl dBDocumentImpl2 = new DBDocumentImpl();
            dBDocumentImpl2.set("a.b.m1.data", "abm1d" + substring).set("c.d.m1.int", Integer.parseInt(substring)).set("e.f.str", "efs" + substring);
            verifyColumnDataThroughIter1(string, changeDataRecord, dBDocumentImpl2, false);
        }
        table.close();
    }

    @Test
    public void testStress2() throws Exception {
        String str = "/tmp/chglogstressdst2:jstresssrc2";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, "/tmp/jstresssrc2");
        Table table = MapRDBImpl.getTable("/tmp/jstresssrc2");
        TestCDPSUtil.replaceStreamTable("/tmp/chglogstressdst2", true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/jstresssrc2", "/tmp/chglogstressdst2", str, false, null);
        verifyInfoCLG(str, "/tmp/jstresssrc2", null, null);
        JsonConsumer jsonConsumer = new JsonConsumer(str, 2000);
        JsonLoader jsonLoader = new JsonLoader(0, 500, table);
        JsonLoader jsonLoader2 = new JsonLoader(500, 1000, table);
        JsonLoader jsonLoader3 = new JsonLoader(1000, 1500, table);
        JsonLoader jsonLoader4 = new JsonLoader(1500, 2000, table);
        Thread thread = new Thread(jsonConsumer, "consumer1");
        Thread thread2 = new Thread(jsonLoader, "loader21");
        Thread thread3 = new Thread(jsonLoader2, "loader22");
        Thread thread4 = new Thread(jsonLoader3, "loader23");
        Thread thread5 = new Thread(jsonLoader4, "loader24");
        thread.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        thread2.join();
        thread3.join();
        thread4.join();
        thread5.join();
        thread.join();
        table.close();
    }
}
