/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.FamilyDescriptor;
import com.mapr.db.JsonTable;
import com.mapr.db.Table;
import com.mapr.db.cdc.tests.JsonConsumer;
import com.mapr.db.cdc.tests.JsonLoader;
import com.mapr.db.cdc.tests.SimpleProducer;
import com.mapr.db.cdc.tests.TestCDPSMisc;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.db.tests.utils.DBTests;
import com.mapr.fs.utils.ssh.TestCluster;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.FieldPath;
import org.ojai.KeyValue;
import org.ojai.Value;
import org.ojai.store.DocumentMutation;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSCLIWithCluster
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSCLIWithCluster.class);
    private static AdminImpl testAdmin = null;
    static final String TED_FILE = "build_fileserver/fs/common/ted/ted";
    static java.nio.file.Path repoRootPath = null;
    static java.nio.file.Path tedCmdPath = null;
    static final String SET_TED_CMD = " enable ExpandMutationArray";
    static final String RESET_TED_CMD = " disable ExpandMutationArray";

    public static void verifyInfoStreamTable(String tableName, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli stream info -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        TestCDPSUtil.VerifyCmdOutput(retstr, tableName, colChglog, valChglog, true);
    }

    public static void verifyListTopicStreamTable(String tableName, String topicname, String[] colChglog, String[] valChglog) throws Exception {
        String cmd = "maprcli stream topic list -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        TestCDPSUtil.VerifyCmdOutput(retstr, topicname, colChglog, valChglog, true);
    }

    public static void verifyTableCFList(String tableName, String cfpath, String[] colChglog, String[] valChglog, boolean verifyexist) throws Exception {
        String cmd = "maprcli table cf list -path " + tableName;
        System.out.println("\n" + cmd);
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        TestCDPSUtil.VerifyCmdOutput(retstr, cfpath, colChglog, valChglog, verifyexist);
    }

    public static int deleteStreamTable(String tableName) throws Exception {
        String cmd = "maprcli stream delete -path " + tableName;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int editStreamTable(String tableName, String paraName, String paraVal) throws Exception {
        String cmd = "maprcli stream edit -path " + tableName + " " + paraName + " " + paraVal;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int createStreamTopic(String tableName, String topicName) throws Exception {
        String cmd = "maprcli stream topic create -path " + tableName + " -topic " + topicName;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int deleteStreamTopic(String tableName, String topicName) throws Exception {
        String cmd = "maprcli stream topic delete -path " + tableName + " -topic " + topicName;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int renameCF(String tableName, String cfName, String newCfName) throws Exception {
        String cmd = "maprcli table cf rename -path " + tableName + " -cfname " + cfName + " -newcfname " + newCfName;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int mvFile(String tableName, String newTableName) throws Exception {
        String cmd = "hadoop fs -mv " + tableName + " " + newTableName;
        System.out.println("\n" + cmd);
        return TestCluster.runCommand((String)cmd).getExitCode();
    }

    public static int cmdInfoStream(String tableName) throws Exception {
        String cmd = "maprcli stream info -path " + tableName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static void resumeCLG(String topicFullName, String srcTableName) throws Exception {
        String cmd = "maprcli table changelog resume -path " + srcTableName + " -changelog " + topicFullName;
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        System.out.println(retstr);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"false"};
        TestCDPSCLIWithCluster.verifyListCLG(srcTableName, topicFullName, colChglog, valChglog);
    }

    public static void pauseCLG(String topicFullName, String srcTableName) throws Exception {
        String cmd = "maprcli table changelog pause -path " + srcTableName + " -changelog " + topicFullName;
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        System.out.println(retstr);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"true"};
        TestCDPSCLIWithCluster.verifyListCLG(srcTableName, topicFullName, colChglog, valChglog);
    }

    public static void verifyInfoCLG(String topicFullName, String srcTableName, String[] colChglog, String[] valChglog) throws Exception {
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(topicFullName, srcTableName, colChglog, valChglog, false);
    }

    public static void verifyInfoCLGDebug(String topicFullName, String srcTableName, String[] colChglog, String[] valChglog, boolean isDebug) throws Exception {
        String cmd = "maprcli table changelog info -changelog " + topicFullName + (isDebug ? " -debug true" : "");
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        System.out.println(retstr);
        TestCDPSUtil.VerifyCmdOutput(retstr, srcTableName, colChglog, valChglog, true);
    }

    public static void verifyListCLG(String tableName, String key, String[] colChglog, String[] valChglog) throws Exception {
        TestCDPSCLIWithCluster.verifyListCLGExist(tableName, key, colChglog, valChglog, true);
    }

    public static void verifyListCLGExist(String tableName, String key, String[] colChglog, String[] valChglog, boolean exist) throws Exception {
        String cmd = "maprcli table changelog list -refreshnow true -path " + tableName;
        String retstr = TestCluster.runCommand((String)cmd).getStdOut();
        System.out.println(retstr);
        TestCDPSUtil.VerifyCmdOutput(retstr, key, colChglog, valChglog, exist);
    }

    public static int cmdEditThrottleCLG(String srcTableName, String topicFullName, boolean throttle) throws Exception {
        String cmd = "maprcli table changelog edit -path " + srcTableName + " -changelog " + topicFullName + " -throttle " + throttle;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static int cmdListCLG(String srcTableName) throws Exception {
        String cmd = "maprcli table changelog list -refreshnow true -path " + srcTableName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static String[] cmdListCLGJson(String srcTableName, String[] fieldNames) throws Exception {
        String cmd = "maprcli table changelog list -refreshnow true -json -path " + srcTableName;
        String cmdretstr = TestCluster.runCommand((String)cmd).getOutput();
        return TestCDPSUtil.getFieldsFromCliJsonStr(fieldNames, cmdretstr);
    }

    public static int cmdPauseCLG(String srcTableName, String topicFullName) throws Exception {
        String cmd = "maprcli table changelog pause -path " + srcTableName + " -changelog " + topicFullName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static int cmdResumeCLG(String srcTableName, String topicFullName) throws Exception {
        String cmd = "maprcli table changelog resume -path " + srcTableName + " -changelog " + topicFullName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static int cmdInfoCLG(String topicFullName) throws Exception {
        String cmd = "maprcli table changelog info -changelog " + topicFullName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static String[] cmdInfoCLGJson(String topicFullName, String[] fieldNames) throws Exception {
        String cmd = "maprcli table changelog info -changelog " + topicFullName + " -json -debug true";
        String cmdretstr = TestCluster.runCommand((String)cmd).getOutput();
        return TestCDPSUtil.getFieldsFromCliJsonStr(fieldNames, cmdretstr);
    }

    public static int cmdRemoveCLG(String srcTableName, String topicFullName) throws Exception {
        String cmd = "maprcli table changelog remove -path " + srcTableName + " -changelog " + topicFullName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static int cmdMkdir(String dirName) throws Exception {
        String cmd = "hadoop fs -mkdir -p  " + dirName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static int cmdRmdir(String dirName) throws Exception {
        String cmd = "hadoop fs -rm -r  " + dirName;
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    @BeforeClass
    public static void startupBeforeClass() throws IOException {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 20 minutes, please wait ---\nTestCDPSCLIWithCluster#testCLGAddPause         ------ 10 minutes\nTestCDPSCLIWithCluster#testInvalidStreamTopic  ------ 3 minutes\nTestCDPSCLIWithCluster#testUseExistingTopic    ------ 1 minutes\nTestCDPSCLIWithCluster#testCLGAddColumns       ------ 3 minutes\nTestCDPSCLIWithCluster#testStream              ------ 2 minutes\nTestCDPSCLIWithCluster#testImpersonation       ------ 3 minutes\nTestCDPSCLIWithCluster#testStress1             ------ 3 minutes\nTestCDPSCLIWithCluster#testStress2             ------ 3 minutes\nTestCDPSCLIWithCluster#testFakeAppend          ------ 10 minutes\nTestCDPSCLIWithCluster#testAssignProducer      ------ 20 minutes\n");
        testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        testAdmin.close();
        System.out.println("Done!");
    }

    @Test
    public void testImpersonation() throws Exception {
        String user = "m7user1";
        String chgTable1 = "/tmp/chgImperson1";
        String jsonTableName1 = "/tmp/jsonImperson1";
        final String chglogTopicFullName1 = "/tmp/chgImperson1:jsonImperson1";
        String chglogTopicName1 = "jsonImperson1";
        UserGroupInformation ugi = TestCDPSCLIWithCluster.createUser("m7user1");
        System.out.println("Running impersonation test as user: m7user1");
        ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                String line;
                int retcode = 0;
                String[] cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli table delete -path /tmp/jsonImperson1"};
                Process pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli table create -path /tmp/jsonImperson1 -tabletype json "};
                pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                Assert.assertEquals((long)0L, (long)retcode);
                JsonTable jsonTable = MapRDBImpl.getTable((String)"/tmp/jsonImperson1");
                DBDocumentImpl rec = new DBDocumentImpl();
                rec.set("dummyf1", 10).set("dummyf2", 20);
                for (int i = 0; i < 3; ++i) {
                    jsonTable.insertOrReplace("row" + i, (Document)rec);
                    jsonTable.flush();
                }
                cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli stream delete -path /tmp/chgImperson1"};
                pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli stream create -path /tmp/chgImperson1 -ischangelog true"};
                pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                Assert.assertEquals((long)0L, (long)retcode);
                cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli stream info -path /tmp/chgImperson1"};
                pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                Assert.assertEquals((long)0L, (long)retcode);
                StringBuffer output = new StringBuffer();
                BufferedReader reader = new BufferedReader(new InputStreamReader(pb.getInputStream()));
                while ((line = reader.readLine()) != null) {
                    System.out.println(line + "\n");
                    output.append(line + "\n");
                }
                String[] colChglog = new String[]{"produceperm", "ischangelog", "topicperm"};
                String[] valChglog = new String[]{"u:m7user1", "true", "u:m7user1"};
                TestCDPSUtil.VerifyCmdOutput(output.toString(), "/tmp/chgImperson1", colChglog, valChglog, true);
                cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli table changelog add -path /tmp/jsonImperson1 -changelog " + chglogTopicFullName1};
                pb = Runtime.getRuntime().exec(cmd);
                pb.waitFor();
                retcode = pb.exitValue();
                Assert.assertEquals((long)0L, (long)retcode);
                String[] colToVerify = new String[]{"replicaState"};
                String[] valToVerify = new String[]{"REPLICA_STATE_REPLICATING"};
                boolean inBucketReplication = false;
                int waitCount = 10;
                for (int i = 0; i < waitCount; ++i) {
                    cmd = new String[]{"/bin/bash", "-c", "echo password | sudo -u m7user1 -S maprcli table changelog list -refreshnow true -path /tmp/jsonImperson1"};
                    pb = Runtime.getRuntime().exec(cmd);
                    pb.waitFor();
                    retcode = pb.exitValue();
                    Assert.assertEquals((long)0L, (long)retcode);
                    output = new StringBuffer();
                    reader = new BufferedReader(new InputStreamReader(pb.getInputStream()));
                    while ((line = reader.readLine()) != null) {
                        System.out.println(line + "\n");
                        output.append(line + "\n");
                    }
                    inBucketReplication = TestCDPSUtil.FindValueInCmdOutput(output.toString(), chglogTopicFullName1, colToVerify, valToVerify);
                    if (inBucketReplication) break;
                    Thread.sleep(3000L);
                }
                Assert.assertTrue((boolean)inBucketReplication);
                return null;
            }
        });
    }

    private static UserGroupInformation createUser(String user) throws IOException {
        return UserGroupInformation.createProxyUser((String)user, (UserGroupInformation)UserGroupInformation.getLoginUser());
    }

    @Test
    public void testStream() throws Exception {
        String streamTable1 = "/tmp/streamTable1";
        String chgTable1 = "/tmp/chgTable1";
        TestCDPSUtil.replaceStreamTable(streamTable1, false, 3);
        TestCDPSUtil.replaceStreamTable(chgTable1, true, 3);
        String[] colChglog = new String[]{"ischangelog", "defaultpartitions"};
        String[] valChglog = new String[]{"false", "3"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(streamTable1, colChglog, valChglog);
        colChglog = new String[]{"ischangelog", "defaultpartitions"};
        valChglog = new String[]{"true", "3"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgTable1, colChglog, valChglog);
        int retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-compression", "zlib");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-compression", "zlib");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-autocreate", "false");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-autocreate", "false");
        Assert.assertNotEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(streamTable1, "-defaultpartitions", "4");
        Assert.assertEquals((long)retcode, (long)0L);
        retcode = TestCDPSCLIWithCluster.editStreamTable(chgTable1, "-defaultpartitions", "4");
        Assert.assertNotEquals((long)retcode, (long)0L);
        colChglog = new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"};
        valChglog = new String[]{"false", "4", "zlib", "false"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(streamTable1, colChglog, valChglog);
        colChglog = new String[]{"ischangelog", "defaultpartitions", "compression", "autocreate"};
        valChglog = new String[]{"true", "3", "zlib", "true"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgTable1, colChglog, valChglog);
        String jsonTableName1 = "/tmp/jsonTable1";
        String chglogTopicFullName1 = chgTable1 + ":jsonTable1";
        String chglogTopicName1 = "jsonTable1";
        String streamTopicName1 = streamTable1 + ":jsonTable1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableName1);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, streamTable1, streamTopicName1);
        Assert.assertNotEquals((long)retcode, (long)0L);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)retcode, (long)0L);
        TestCDPSCLIWithCluster.verifyListCLG(jsonTableName1, chglogTopicFullName1, null, null);
        TestCDPSCLIWithCluster.verifyListTopicStreamTable(chgTable1, chglogTopicName1, null, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(chglogTopicFullName1, jsonTableName1, null, null);
    }

    @Test
    public void testNonChangelogProducer() throws Exception {
        String streamTable1 = "/tmp/streamNonProd1";
        String chgTable1 = "/tmp/chgNonProd1";
        TestCDPSUtil.replaceStreamTable(streamTable1, false, 4);
        TestCDPSUtil.replaceStreamTable(chgTable1, true, 4);
        String[] colChglog = new String[]{"ischangelog", "defaultpartitions"};
        String[] valChglog = new String[]{"false", "4"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(streamTable1, colChglog, valChglog);
        colChglog = new String[]{"ischangelog", "defaultpartitions"};
        valChglog = new String[]{"true", "4"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgTable1, colChglog, valChglog);
        String jsonTableName1 = "/tmp/jsonNonProd1";
        String chglogTopicFullName1 = chgTable1 + ":jsonNonProd1";
        String chglogTopicName1 = "jsonNonProd1";
        String streamTopicName1 = streamTable1 + ":jsonNonProd1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableName1);
        int retcode = 0;
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)0L, (long)retcode);
        retcode = TestCDPSCLIWithCluster.createStreamTopic(streamTable1, chglogTopicName1);
        Assert.assertEquals((long)0L, (long)retcode);
        retcode = SimpleProducer.testSimpleProducer(streamTable1 + ":" + chglogTopicName1, 5);
        Assert.assertEquals((long)0L, (long)retcode);
        retcode = SimpleProducer.testSimpleProducer(chglogTopicFullName1, 5);
        Assert.assertNotEquals((long)0L, (long)retcode);
    }

    @Test
    public void testDuplicateAdd() throws Exception {
        String chgTable1 = "/tmp/chgDup1";
        TestCDPSUtil.replaceStreamTable(chgTable1, true, 5);
        String jsonTableName1 = "/tmp/jsonDup1";
        String chglogTopicFullName1 = chgTable1 + ":jsonDup1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableName1);
        int retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)0L, (long)retcode);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertNotEquals((long)0L, (long)retcode);
        String cmd = "maprcli table changelog add -path " + jsonTableName1 + " -changelog " + chglogTopicFullName1 + " -useexistingtopic true";
        System.out.println(cmd);
        retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        Assert.assertNotEquals((long)0L, (long)retcode);
    }

    @Test
    public void testReAdd() throws Exception {
        String chgTable1 = "/tmp/chgReAdd1";
        TestCDPSUtil.replaceStreamTable(chgTable1, true, 1);
        String jsonTableName1 = "/tmp/jsonReAdd1";
        String chglogTopicName1 = "jsonReAdd1";
        String chglogTopicFullName1 = chgTable1 + ":jsonReAdd1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableName1);
        int retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableName1, chglogTopicFullName1, null, null, true);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(jsonTableName1, chglogTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableName1, chglogTopicFullName1, null, null, false);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSMisc.waitForBucketRepl(jsonTableName1, chglogTopicFullName1);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(jsonTableName1, chglogTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableName1, chglogTopicFullName1, null, null, false);
        retcode = TestCDPSCLIWithCluster.deleteStreamTopic(chgTable1, chglogTopicName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chglogTopicFullName1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableName1, chgTable1, chglogTopicFullName1);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSMisc.waitForBucketRepl(jsonTableName1, chglogTopicFullName1);
    }

    @Test
    public void testStreamAutoCreate() throws Exception {
        String chgStream1 = "/tmp/noStream1";
        String chgStreamNoDir1 = "noStream1";
        int retcode = 0;
        retcode = TestCDPSUtil.replaceStreamTableThrow(chgStream1, true, 34, false, false);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.replaceStreamTableThrow(chgStream1, true, 34, true, false);
        String[] colChglog = new String[]{"autocreate", "defaultpartitions"};
        String[] valChglog = new String[]{"true", "34"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgStream1, colChglog, valChglog);
    }

    @Test
    public void testRemoveNoStream() throws Exception {
        String chgStream1 = "/tmp/noStream1";
        String chgStreamNoDir1 = "noStream1";
        TestCDPSUtil.replaceStreamTable(chgStream1, true, 24);
        String[] colChglog = new String[]{"ischangelog", "defaultpartitions"};
        String[] valChglog = new String[]{"true", "24"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgStream1, colChglog, valChglog);
        String chgJson1 = "/tmp/noJson1";
        String chgJsonNoDir1 = "noJson1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, chgJson1);
        int retcode = 0;
        String chgTopicFullName1 = chgStream1 + ":" + chgJsonNoDir1;
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        TestCDPSCLIWithCluster.verifyListCLGExist(chgJson1, chgTopicFullName1, null, null, true);
        retcode = TestCDPSCLIWithCluster.deleteStreamTable(chgStream1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        TestCDPSCLIWithCluster.verifyListCLGExist(chgJson1, chgTopicFullName1, null, null, true);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        TestCDPSCLIWithCluster.verifyListCLGExist(chgJson1, chgTopicFullName1, null, null, false);
    }

    public static boolean hasErrorCLGList(String srcTable) throws Exception {
        String cmd = new String("maprcli table changelog list -refreshnow true -path " + srcTable + " -json");
        String retstr = DBTests.ExecuteShellCmd((String)cmd);
        _logger.info(retstr);
        return retstr.contains("error");
    }

    @Test
    public void testUseExistingTopic() throws Exception {
        String chgStream1 = "/tmp/chgExistTopic1";
        TestCDPSUtil.replaceStreamTable(chgStream1, true, 1);
        String chgJson1 = "/tmp/jsonExistingTopic1";
        String topicName = "jsonExistingTopic1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, chgJson1);
        int retcode = 0;
        String cmd = "maprcli table changelog add -path " + chgJson1 + " -changelog " + chgStream1 + ":" + topicName + " -useexistingtopic true";
        retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        boolean hasErr = TestCDPSCLIWithCluster.hasErrorCLGList(chgJson1);
        Assert.assertTrue((boolean)hasErr);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgStream1 + ":" + topicName);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.createStreamTopic(chgStream1, topicName);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        cmd = "maprcli table changelog add -path " + chgJson1 + " -changelog " + chgStream1 + ":" + topicName;
        retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        hasErr = TestCDPSCLIWithCluster.hasErrorCLGList(chgJson1);
        Assert.assertTrue((boolean)hasErr);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgStream1 + ":" + topicName);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        cmd = "maprcli table changelog add -path " + chgJson1 + " -changelog " + chgStream1 + ":" + topicName + " -useexistingtopic true";
        retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        hasErr = TestCDPSCLIWithCluster.hasErrorCLGList(chgJson1);
        Assert.assertTrue((!hasErr ? 1 : 0) != 0);
    }

    @Test
    public void testChangelogInfo() throws Exception {
        String dirChgInfo = "/chginfo";
        String dstChgInfo = "/chginfo/dst1";
        String srcChgInfo = "/chginfo/src1";
        String topicChgInfo = "src1";
        String tmpdirChgInfo = "/tmp/chginfo";
        String tmpdstChgInfo = "/tmp/chginfo/dst2";
        String tmpsrcChgInfo = "/tmp/chginfo/src2";
        String tmptopicChgInfo = "src2";
        int retcode = TestCDPSCLIWithCluster.cmdRmdir(dirChgInfo);
        retcode = TestCDPSCLIWithCluster.cmdRmdir(tmpdirChgInfo);
        retcode = TestCDPSCLIWithCluster.cmdMkdir(dirChgInfo);
        Assert.assertTrue((0 == retcode ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdMkdir(tmpdirChgInfo);
        Assert.assertTrue((0 == retcode ? 1 : 0) != 0);
        TestCDPSUtil.replaceStreamTable(dstChgInfo, true, 1);
        TestCDPSUtil.replaceStreamTable(tmpdstChgInfo, true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, srcChgInfo);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, tmpsrcChgInfo);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(tmpsrcChgInfo, tmpdstChgInfo, tmpdstChgInfo + ":" + tmptopicChgInfo);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        String[] colChglog = new String[]{"table"};
        String[] valChglog = new String[]{tmpsrcChgInfo};
        TestCDPSCLIWithCluster.verifyInfoCLG(tmpdstChgInfo + ":" + tmptopicChgInfo, tmpsrcChgInfo, colChglog, valChglog);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(srcChgInfo, dstChgInfo, dstChgInfo + ":" + topicChgInfo);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        colChglog = new String[]{"table"};
        valChglog = new String[]{srcChgInfo};
        TestCDPSCLIWithCluster.verifyInfoCLG(dstChgInfo + ":" + topicChgInfo, srcChgInfo, colChglog, valChglog);
    }

    @Test
    public void testInvalidStreamTopic() throws Exception {
        String validStream1 = "/tmp/validStream1";
        String validStreamNoDir1 = "validStream1";
        TestCDPSUtil.replaceStreamTable(validStream1, false, 2);
        String[] colChglog = new String[]{"ischangelog", "defaultpartitions"};
        String[] valChglog = new String[]{"false", "2"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(validStream1, colChglog, valChglog);
        String nonExistStream1 = "/tmp/nonExistStream1";
        String nonExistStreamNoDir1 = "nonExistStream1";
        String chgStream1 = "/tmp/chgStream1";
        String chgStreamNoDir1 = "chgStream1";
        TestCDPSUtil.replaceStreamTable(chgStream1, true, 4);
        colChglog = new String[]{"ischangelog", "defaultpartitions"};
        valChglog = new String[]{"true", "4"};
        TestCDPSCLIWithCluster.verifyInfoStreamTable(chgStream1, colChglog, valChglog);
        String validJson1 = "/tmp/validJson1";
        String validJsonNoDir1 = "validJson1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, validJson1);
        String nonExistJson1 = "/tmp/nonExistJson1";
        String nonExistJsonNoDir1 = "nonExistJson1";
        String chgJson1 = "/tmp/chgJson1";
        String chgJsonNoDir1 = "chgJson1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, chgJson1);
        String wrongStr = "\"a#$%^&\"";
        int retcode = 0;
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(nonExistJson1, chgStream1, chgStream1 + ":" + nonExistJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, nonExistStream1, nonExistStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(validStream1, chgStream1, chgStream1 + ":" + validStreamNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, validJson1, validJson1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, validStream1, validStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, chgStream1 + ":");
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, chgStream1 + ":" + wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, wrongStr + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        String chgTopicFullName1 = chgStream1 + ":" + chgJsonNoDir1;
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(chgJson1, chgStream1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(nonExistJson1, chgStream1 + ":" + nonExistJson1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, nonExistStream1 + ":" + chgJsonNoDir1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(validStream1, chgStream1 + ":" + validStreamNoDir1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, validJson1 + ":" + chgJsonNoDir1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, validStream1 + ":" + chgJsonNoDir1, true);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, ":" + chgJsonNoDir1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, chgStream1 + ":", true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, chgStream1 + ":" + wrongStr, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, wrongStr + ":" + chgJsonNoDir1, true);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdEditThrottleCLG(chgJson1, chgTopicFullName1, true);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdListCLG(nonExistJson1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdListCLG(validJson1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdListCLG(validStream1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdListCLG(wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdListCLG(chgJson1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(nonExistJson1, chgStream1 + ":" + nonExistJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, nonExistStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(validStream1, chgStream1 + ":" + validStreamNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, validJson1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, validStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, chgStream1 + ":");
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, chgStream1 + ":" + wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, wrongStr + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdPauseCLG(chgJson1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(nonExistJson1, chgStream1 + ":" + nonExistJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, nonExistStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(validStream1, chgStream1 + ":" + validStreamNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, validJson1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, validStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, chgStream1 + ":");
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, chgStream1 + ":" + wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, wrongStr + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdResumeCLG(chgJson1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chgStream1 + ":" + nonExistJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(nonExistStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chgStream1 + ":" + validStreamNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(validJson1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(validStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chgStream1 + ":");
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chgStream1 + ":" + wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(wrongStr + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdInfoCLG(chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(nonExistJson1, chgStream1 + ":" + nonExistJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, nonExistStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(validStream1, chgStream1 + ":" + validStreamNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, validJson1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, validStream1 + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgStream1 + ":");
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgStream1 + ":" + wrongStr);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, wrongStr + ":" + chgJsonNoDir1);
        Assert.assertTrue((retcode != 0 ? 1 : 0) != 0);
        retcode = TestCDPSCLIWithCluster.cmdRemoveCLG(chgJson1, chgTopicFullName1);
        Assert.assertTrue((retcode == 0 ? 1 : 0) != 0);
    }

    public static void verifyColumnDataThroughIter1(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.SET);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, opTime, null, (Value)rec);
    }

    public static void verifyColumnDataThroughIter2(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug, ChangeOp[] cops) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[0], opTime, "a.b.c", rec.getValue("a.b.c"));
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[1], opTime, "g", rec.getValue("g"));
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, cops[2], opTime, "x.y.z.w", rec.getValue("x.y.z.w"));
    }

    public static void verifyColumnDataThroughIter3(String strRowId, ChangeDataRecord cdr, Document rec, boolean isDebug, String[] fieldnames) throws IOException {
        if (isDebug) {
            TestCDPSUtil.printCDRec(cdr);
        }
        System.out.println("-----input rec " + rec);
        TestCDPSUtil.verifyHead(cdr, true, strRowId, TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, 0L, ChangeOp.MERGE);
        long opTime = cdr.getOpTimestamp();
        Iterator cdrItr = cdr.iterator();
        for (int i = 0; i < fieldnames.length; ++i) {
            TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.SET, opTime, fieldnames[i], rec.getValue(fieldnames[i]));
        }
    }

    @Test
    public void testCLGAddPause() throws Exception {
        ArrayList presetValuesScf = new ArrayList();
        String jsrc1 = "/tmp/jsrc1";
        String chglogdst1 = "/tmp/chglogdst1";
        String topicFullName = chglogdst1 + ":jsrc1";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        Object rowid = null;
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("dummyf1", 10).set("dummyf2", 20);
        for (int i = 0; i < 20; ++i) {
            jsonTable.insertOrReplace("row" + i, (Document)rec);
            jsonTable.flush();
        }
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, true, null);
        String[] colChglog = new String[]{"paused"};
        String[] valChglog = new String[]{"true"};
        TestCDPSCLIWithCluster.verifyListCLG(jsrc1, topicFullName, colChglog, valChglog);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        Object crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        Object cdr = null;
        Object putrec = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(20, consumerScf, 30);
        Assert.assertEquals((long)0L, (long)getListScf.size());
        TestCDPSCLIWithCluster.resumeCLG(topicFullName, jsrc1);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(20, consumerScf, 30);
        Assert.assertEquals((long)20L, (long)getListScf.size());
        TestCDPSCLIWithCluster.pauseCLG(topicFullName, jsrc1);
        for (int i = 0; i < 10; ++i) {
            jsonTable.insertOrReplace("row" + i, (Document)rec);
            jsonTable.flush();
        }
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(10, consumerScf, 30);
        Assert.assertEquals((long)0L, (long)getListScf.size());
        TestCDPSCLIWithCluster.resumeCLG(topicFullName, jsrc1);
        getListScf = TestCDPSUtil.fetchChangeDataWithBreak(10, consumerScf, 30);
        Assert.assertEquals((long)10L, (long)getListScf.size());
        jsonTable.close();
    }

    @Test
    public void testAddPrevValue() throws Exception {
        String colFields = "a,b.c,d.e.f";
        String prevColFields = "a.m1,a.m2,b.c,d.e,x.y.z";
        int retcode = 0;
        String[] colChglogsrc = null;
        String[] valChglogsrc = null;
        String[] colChglogdst = null;
        String[] valChglogdst = null;
        String chgTablePrev1 = "/tmp/chgAddPrev1";
        String jsonTableNoPrev1 = "/tmp/jsonAddNoPrev1";
        String jsonTablePrevNone1 = "/tmp/jsonAddPrevNone1";
        String jsonTablePrevMutated1 = "/tmp/jsonAddPrevMutated1";
        String jsonTablePrevRow1 = "/tmp/jsonAddPrevRow1";
        String jsonTablePrevColumns1 = "/tmp/jsonAddPrevColumns1";
        String jsonTableColPrevColumns1 = "/tmp/jsonAddColPrevColumns1";
        String chglogTopicNameNoPrev1 = "jsonAddNoPrev1";
        String chglogTopicNamePrevNone1 = "jsonAddPrevNone1";
        String chglogTopicNamePrevMutated1 = "jsonAddPrevMutated1";
        String chglogTopicNamePrevRow1 = "jsonAddPrevRow1";
        String chglogTopicNamePrevColumns1 = "jsonAddPrevColumns1";
        String chglogTopicNameColPrevColumns1 = "jsonAddColPrevColumns1";
        String chglogTopicFullNameNoPrev1 = chgTablePrev1 + ":jsonAddNoPrev1";
        String chglogTopicFullNamePrevNone1 = chgTablePrev1 + ":jsonAddPrevNone1";
        String chglogTopicFullNamePrevMutated1 = chgTablePrev1 + ":jsonAddPrevMutated1";
        String chglogTopicFullNamePrevRow1 = chgTablePrev1 + ":jsonAddPrevRow1";
        String chglogTopicFullNamePrevColumns1 = chgTablePrev1 + ":jsonAddPrevColumns1";
        String chglogTopicFullNameColPrevColumns1 = chgTablePrev1 + ":jsonAddColPrevColumns1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableNoPrev1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTablePrevNone1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTablePrevMutated1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTablePrevRow1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTablePrevColumns1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsonTableColPrevColumns1);
        TestCDPSUtil.replaceStreamTable(chgTablePrev1, true, 1);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableNoPrev1, chgTablePrev1, chglogTopicFullNameNoPrev1);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableNoPrev1, chglogTopicFullNameNoPrev1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNameNoPrev1, jsonTableNoPrev1, colChglogsrc, valChglogsrc, true);
        valChglogsrc = new String[]{"$NONE"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevNone1, chgTablePrev1, chglogTopicFullNamePrevNone1, "'$NONE'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevNone1, chglogTopicFullNamePrevNone1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevNone1, jsonTablePrevNone1, colChglogsrc, valChglogsrc, true);
        valChglogsrc = new String[]{"$MUTATED"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevMutated1, chgTablePrev1, chglogTopicFullNamePrevMutated1, "'$MUTATED'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevMutated1, chglogTopicFullNamePrevMutated1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevMutated1, jsonTablePrevMutated1, colChglogsrc, valChglogsrc, true);
        valChglogsrc = new String[]{"$ROW"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevRow1, chgTablePrev1, chglogTopicFullNamePrevRow1, "'$ROW'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevRow1, chglogTopicFullNamePrevRow1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevRow1, jsonTablePrevRow1, colChglogsrc, valChglogsrc, true);
        colChglogsrc = new String[]{"includeColumns"};
        valChglogsrc = new String[]{prevColFields};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevColumns1, chgTablePrev1, chglogTopicFullNamePrevColumns1, prevColFields);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevColumns1, chglogTopicFullNamePrevColumns1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevColumns1, jsonTablePrevColumns1, colChglogsrc, valChglogsrc, true);
        colChglogsrc = new String[]{"Columns", "includeColumns"};
        valChglogsrc = new String[]{colFields, prevColFields};
        colChglogdst = new String[]{"includeColumns"};
        valChglogdst = new String[]{prevColFields};
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsonTableColPrevColumns1, chgTablePrev1, chglogTopicFullNameColPrevColumns1, false, colFields, prevColFields);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableColPrevColumns1, chglogTopicFullNameColPrevColumns1, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNameColPrevColumns1, jsonTableColPrevColumns1, colChglogdst, valChglogdst, true);
        String chgTablePrev2 = "/tmp/chgAddPrev2";
        String jsonTableNoPrev2 = "/tmp/jsonAddNoPrev2";
        String jsonTablePrevNone2 = "/tmp/jsonAddPrevNone2";
        String jsonTablePrevMutated2 = "/tmp/jsonAddPrevMutated2";
        String jsonTablePrevRow2 = "/tmp/jsonAddPrevRow2";
        String jsonTablePrevColumns2 = "/tmp/jsonAddPrevColumns2";
        String jsonTableColPrevColumns2 = "/tmp/jsonAddColPrevColumns2";
        String chglogTopicNameNoPrev2 = "jsonAddNoPrev2";
        String chglogTopicNamePrevNone2 = "jsonAddPrevNone2";
        String chglogTopicNamePrevMutated2 = "jsonAddPrevMutated2";
        String chglogTopicNamePrevRow2 = "jsonAddPrevRow2";
        String chglogTopicNamePrevColumns2 = "jsonAddPrevColumns2";
        String chglogTopicNameColPrevColumns2 = "jsonAddColPrevColumns2";
        String chglogTopicFullNameNoPrev2 = chgTablePrev2 + ":jsonAddNoPrev2";
        String chglogTopicFullNamePrevNone2 = chgTablePrev2 + ":jsonAddPrevNone2";
        String chglogTopicFullNamePrevMutated2 = chgTablePrev2 + ":jsonAddPrevMutated2";
        String chglogTopicFullNamePrevRow2 = chgTablePrev2 + ":jsonAddPrevRow2";
        String chglogTopicFullNamePrevColumns2 = chgTablePrev2 + ":jsonAddPrevColumns2";
        String chglogTopicFullNameColPrevColumns2 = chgTablePrev2 + ":jsonAddColPrevColumns2";
        HashMap<String, String> cfPath = new HashMap<String, String>();
        cfPath.put("cf2", "a");
        cfPath.put("cf3", "a.m1");
        cfPath.put("cf4", "d");
        cfPath.put("cf5", "d.e");
        cfPath.put("cf6", "x");
        cfPath.put("cf7", "notindata");
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTableNoPrev2, cfPath);
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTablePrevNone2, cfPath);
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTablePrevMutated2, cfPath);
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTablePrevRow2, cfPath);
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTablePrevColumns2, cfPath);
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsonTableColPrevColumns2, cfPath);
        TestCDPSUtil.replaceStreamTable(chgTablePrev2, true, 1);
        colChglogsrc = new String[]{"includeColumns"};
        valChglogsrc = new String[]{"$NONE"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTableNoPrev2, chgTablePrev2, chglogTopicFullNameNoPrev2);
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTableNoPrev2, chglogTopicFullNameNoPrev2, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNameNoPrev2, jsonTableNoPrev2, colChglogsrc, valChglogsrc, true);
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevNone2, chgTablePrev2, chglogTopicFullNamePrevNone2, "'$NONE'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevNone2, chglogTopicFullNamePrevNone2, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevNone2, jsonTablePrevNone2, colChglogsrc, valChglogsrc, true);
        valChglogsrc = new String[]{"$MUTATED"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevMutated2, chgTablePrev2, chglogTopicFullNamePrevMutated2, "'$MUTATED'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevMutated2, chglogTopicFullNamePrevMutated2, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevMutated2, jsonTablePrevMutated2, colChglogsrc, valChglogsrc, true);
        valChglogsrc = new String[]{"$ROW"};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevRow2, chgTablePrev2, chglogTopicFullNamePrevRow2, "'$ROW'");
        Assert.assertEquals((long)0L, (long)retcode);
        TestCDPSCLIWithCluster.verifyListCLGExist(jsonTablePrevRow2, chglogTopicFullNamePrevRow2, colChglogsrc, valChglogsrc, true);
        TestCDPSCLIWithCluster.verifyInfoCLGDebug(chglogTopicFullNamePrevRow2, jsonTablePrevRow2, colChglogsrc, valChglogsrc, true);
        colChglogsrc = new String[]{"includeColumns"};
        valChglogsrc = new String[]{prevColFields};
        retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(jsonTablePrevColumns2, chgTablePrev2, chglogTopicFullNamePrevColumns2, prevColFields);
        Assert.assertEquals((long)0L, (long)retcode);
        String[] colPrevCols = TestCDPSCLIWithCluster.cmdListCLGJson(jsonTablePrevColumns2, colChglogsrc);
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.jsonPathStrEquals(prevColFields, colPrevCols[0]));
        colPrevCols = TestCDPSCLIWithCluster.cmdInfoCLGJson(chglogTopicFullNamePrevColumns2, colChglogsrc);
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.jsonPathStrEquals(prevColFields, colPrevCols[0]));
        colChglogsrc = new String[]{"Columns", "includeColumns"};
        valChglogsrc = new String[]{colFields, prevColFields};
        colChglogdst = new String[]{"includeColumns"};
        valChglogdst = new String[]{prevColFields};
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsonTableColPrevColumns2, chgTablePrev2, chglogTopicFullNameColPrevColumns2, false, colFields, prevColFields);
        colPrevCols = TestCDPSCLIWithCluster.cmdListCLGJson(jsonTableColPrevColumns2, colChglogsrc);
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.jsonPathStrEquals(colFields, colPrevCols[0]));
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.jsonPathStrEquals(prevColFields, colPrevCols[1]));
        colPrevCols = TestCDPSCLIWithCluster.cmdInfoCLGJson(chglogTopicFullNameColPrevColumns2, colChglogdst);
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.jsonPathStrEquals(prevColFields, colPrevCols[0]));
    }

    @Test
    public void testCLGAddColumns() throws Exception {
        ArrayList<DBDocumentImpl> presetValuesScf = new ArrayList<DBDocumentImpl>();
        String jsrc1 = "/tmp/jcolsrc1";
        String chglogdst1 = "/tmp/chglogcoldst1";
        String topicFullName = chglogdst1 + ":jcolsrc1";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        String rowid = "row1";
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("a.e.f1", "aev1").set("g.f1", "gv1").set("h.f1", "gv1").set("c.d.f1", "cdv1").set("x.y.z.w.f1", "xyzwv1").set("x.y.z.f1", "xyzv1");
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f1", "abcv1").set("a.b.c.d.f1", "abcdv1").set("g.f1", "gv1").set("x.y.z.w.f1", "xyzwv1");
        presetValuesScf.add(rec);
        rowid = "row2";
        rec = new DBDocumentImpl();
        rec.set("m.n.f2", "mnv2").set("p.q.r.f2", "pqrv2").set("h.f2", "pqrv2");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rowid = "row3";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f3", "abcv3").set("g.f3", "gv3");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        presetValuesScf.add(rec);
        String replPaths = "a.b.c,g,x.y.z.w";
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, replPaths);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        String[] colChglog = new String[]{"Columns"};
        String[] valChglog = new String[]{"a.b.c,g,x.y.z.w"};
        TestCDPSCLIWithCluster.verifyListCLG(jsrc1, chglogdst1, colChglog, valChglog);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        ChangeDataRecord cdr = null;
        Document putrec = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeData(2, consumerScf);
        int putId = 0;
        boolean isDebug = true;
        crec = getListScf.get(putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1("row1", cdr, putrec, isDebug);
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1("row3", cdr, putrec, isDebug);
        rowid = "row4";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("a.e.f4", "aev4").set("g.f4", "gv4").set("h.f4", "gv4").set("c.d.f4", "cdv4").set("x.y.z.w.f4", "xyzwv4").set("x.y.z.f4", "xyzv4");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f4", "abcv4").set("a.b.c.d.f4", "abcdv4").set("g.f4", "gv4").set("x.y.z.w.f4", "xyzwv4");
        presetValuesScf.add(rec);
        rowid = "row5";
        rec = new DBDocumentImpl();
        rec.set("m.n.f5", "mnv5").set("p.q.r.f5", "pqrv5").set("h.f5", "pqrv5");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.setNull("a.b.c").setNull("g").setNull("x.y.z.w");
        presetValuesScf.add(rec);
        rowid = "row6";
        rec = new DBDocumentImpl();
        rec.set("a.b.c.f6", "abcv6").set("x.y.z.w.f6", "xyzwv6");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        rec.empty();
        rec.set("a.b.c.f6", "abcv6").setNull("g").set("x.y.z.w.f6", "xyzwv6");
        presetValuesScf.add(rec);
        DocumentMutation mutation = null;
        rowid = "row4";
        mutation = MapRDBImpl.newMutation();
        mutation.set("a.b.c.f4A", "abcv4A").set("c.d.f4A", "c.dv4A");
        jsonTable.update(rowid, mutation);
        jsonTable.flush();
        rec = new DBDocumentImpl();
        rec.empty();
        rec.set("a.b.c.f4A", "abcv4A");
        presetValuesScf.add(rec);
        rowid = "row6";
        mutation = MapRDBImpl.newMutation();
        mutation.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        jsonTable.update(rowid, mutation);
        jsonTable.flush();
        rec = new DBDocumentImpl();
        rec.empty();
        rec.set("g.map.f6A", "gmapv6A").set("x.y.z.w.f6A", "xyzwv6A");
        presetValuesScf.add(rec);
        for (int i = 0; i < presetValuesScf.size(); ++i) {
            System.out.println("input rec" + i + ": " + presetValuesScf.get(i));
        }
        getListScf = TestCDPSUtil.fetchChangeData(5, consumerScf);
        putId = 0;
        crec = getListScf.get(putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row4", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.SET, ChangeOp.SET, ChangeOp.SET});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row5", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.DELETE, ChangeOp.DELETE, ChangeOp.DELETE});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter2("row6", cdr, putrec, isDebug, new ChangeOp[]{ChangeOp.SET, ChangeOp.DELETE, ChangeOp.SET});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter3("row4", cdr, putrec, isDebug, new String[]{"a.b.c.f4A"});
        crec = getListScf.get(++putId);
        cdr = (ChangeDataRecord)crec.value();
        putrec = (Document)presetValuesScf.get(putId + 2);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter3("row6", cdr, putrec, isDebug, new String[]{"g.map.f6A", "x.y.z.w.f6A"});
        jsonTable.close();
    }

    @Test
    public void testCLGChangeColumns() throws Exception {
        ArrayList presetValuesScf = new ArrayList();
        String jsrc1 = "/tmp/jcolsrc2";
        String chglogdst1 = "/tmp/chglogcoldst2";
        String topicFullName = chglogdst1 + ":jcolsrc2";
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        HashMap<String, String> cfPath = new HashMap<String, String>();
        cfPath.put("cf2", "c.d");
        TestCDPSUtil.replaceJsonTableWithCFMap(testAdmin, jsrc1, cfPath);
        String rowid = "row1";
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.set("a.b.m1.data1", "abmd1").set("c.d.m1.int1", 101);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        System.out.println("input " + rowid + ": " + (Document)rec);
        boolean isDebug = true;
        ChangeDataRecord cdr = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        FieldPath P_CF3 = FieldPath.parseFrom((String)"e.f");
        FamilyDescriptor familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_CF3).setCompression(FamilyDescriptor.Compression.None);
        Path jsrcPath = new Path(jsrc1);
        testAdmin.addFamily(jsrcPath, familyDesc);
        rowid = "row2";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data2", "abm1d2").set("c.d.m1.int2", 102).set("e.f.str2", "efs2");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        System.out.println("input " + rowid + ": " + (Document)rec);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        System.out.println("--- Passed the CF add test ---");
        String[] colChglog = new String[]{"cfname"};
        String[] valChglog = new String[]{"cf2"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "c.d", colChglog, valChglog, true);
        FieldPath P_CF2 = FieldPath.parseFrom((String)"c.d");
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf22").setJsonFieldPath(P_CF2).setCompression(FamilyDescriptor.Compression.None);
        testAdmin.alterFamily(jsrcPath, "cf2", familyDesc);
        colChglog = new String[]{"cfname"};
        valChglog = new String[]{"cf22"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "c.d", colChglog, valChglog, true);
        rowid = "row3";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data3", "abm1d3").set("c.d.m1.int3", 103).set("e.f.str3", "efs3");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        jsonTable.flush();
        System.out.println("input " + rowid + ": " + (Document)rec);
        getListScf = TestCDPSUtil.fetchChangeData(1, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        System.out.println("--- Passed the CF add test ---");
        rowid = "row4";
        rec = new DBDocumentImpl();
        rec.set("a.b.m1.data4", "abm1d4").set("c.d.m1.int4", 104).set("e.f.str4", "efs4");
        jsonTable.insertOrReplace(rowid, (Document)rec);
        System.out.println("input " + rowid + ": " + (Document)rec);
        String rowid2 = "row5";
        DBDocumentImpl rec2 = new DBDocumentImpl();
        rec2.set("a.b.m1.data5", "abm1d5").set("c.d.m1.int5", 105);
        jsonTable.insertOrReplace(rowid2, (Document)rec2);
        jsonTable.flush();
        System.out.println("input " + rowid2 + ": " + rec2);
        colChglog = new String[]{"cfname"};
        valChglog = new String[]{"cf3"};
        TestCDPSCLIWithCluster.verifyTableCFList(jsrc1, "e.f", colChglog, valChglog, true);
        getListScf = TestCDPSUtil.fetchChangeData(2, consumerScf);
        crec = getListScf.get(0);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSUtil.printCDRec(cdr);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid, cdr, (Document)rec, isDebug);
        crec = getListScf.get(1);
        cdr = (ChangeDataRecord)crec.value();
        TestCDPSUtil.printCDRec(cdr);
        TestCDPSCLIWithCluster.verifyColumnDataThroughIter1(rowid2, cdr, (Document)rec2, isDebug);
        jsonTable.close();
    }

    @Test
    public void testStress1() throws Exception {
        Object rowid;
        int i;
        int RowCount = 1000;
        String jsrc1 = "/tmp/jstresssrc1";
        String chglogdst1 = "/tmp/chglogstressdst1";
        String topicFullName = chglogdst1 + ":jstresssrc1";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        List<ConsumerRecord<byte[], ChangeDataRecord>> getListScf = null;
        ConsumerRecord<byte[], ChangeDataRecord> crec = null;
        KafkaConsumer<byte[], ChangeDataRecord> consumerScf = null;
        boolean isDebug = false;
        ChangeDataRecord cdr = null;
        consumerScf = TestCDPSUtil.startConsumer(topicFullName);
        DBDocumentImpl rec = new DBDocumentImpl();
        for (i = 0; i < RowCount; ++i) {
            rowid = "row" + i;
            rec = new DBDocumentImpl();
            rec.set("a.b.m1.data", "abm1d" + i).set("c.d.m1.int", i).set("e.f.str", "efs" + i);
            jsonTable.insertOrReplace((String)rowid, (Document)rec);
        }
        getListScf = TestCDPSUtil.fetchChangeData(RowCount, consumerScf);
        for (i = 0; i < RowCount; ++i) {
            crec = getListScf.get(i);
            cdr = (ChangeDataRecord)crec.value();
            rowid = cdr.getId().getString();
            String idx = ((String)rowid).substring("row".length());
            rec = new DBDocumentImpl();
            rec.set("a.b.m1.data", "abm1d" + idx).set("c.d.m1.int", Integer.parseInt(idx)).set("e.f.str", "efs" + idx);
            TestCDPSCLIWithCluster.verifyColumnDataThroughIter1((String)rowid, cdr, (Document)rec, isDebug);
        }
        jsonTable.close();
    }

    @Test
    public void testStress2() throws Exception {
        int RowCount = 2000;
        String jsrc1 = "/tmp/jstresssrc2";
        String chglogdst1 = "/tmp/chglogstressdst2";
        String topicFullName = chglogdst1 + ":jstresssrc2";
        TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrc1);
        JsonTable jsonTable = MapRDBImpl.getTable((String)jsrc1);
        TestCDPSUtil.replaceStreamTable(chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(jsrc1, chglogdst1, topicFullName, false, null);
        TestCDPSCLIWithCluster.verifyInfoCLG(topicFullName, jsrc1, null, null);
        JsonConsumer jconsumer = new JsonConsumer(topicFullName, RowCount);
        JsonLoader jloader21 = new JsonLoader(0, 500, (Table)jsonTable, topicFullName);
        JsonLoader jloader22 = new JsonLoader(500, 1000, (Table)jsonTable, topicFullName);
        JsonLoader jloader23 = new JsonLoader(1000, 1500, (Table)jsonTable, topicFullName);
        JsonLoader jloader24 = new JsonLoader(1500, 2000, (Table)jsonTable, topicFullName);
        Thread thread1 = new Thread((Runnable)jconsumer, "consumer1");
        Thread thread21 = new Thread((Runnable)jloader21, "loader21");
        Thread thread22 = new Thread((Runnable)jloader22, "loader22");
        Thread thread23 = new Thread((Runnable)jloader23, "loader23");
        Thread thread24 = new Thread((Runnable)jloader24, "loader24");
        thread1.start();
        thread21.start();
        thread22.start();
        thread23.start();
        thread24.start();
        thread21.join();
        thread22.join();
        thread23.join();
        thread24.join();
        thread1.join();
        jsonTable.close();
    }

    @Test
    public void testAssignProducer() throws Exception {
        int i;
        int i2;
        int j;
        int i3;
        int RowCount = 2000;
        String dir = "/tmp/assignprod/";
        int retcode = TestCDPSCLIWithCluster.cmdRmdir(dir);
        retcode = TestCDPSCLIWithCluster.cmdMkdir(dir);
        Assert.assertTrue((0 == retcode ? 1 : 0) != 0);
        String[] jsrcList = new String[200];
        String[] chgList = new String[100];
        String[] topicFullList = new String[200];
        Table[] jsonTableList = new Table[200];
        for (i3 = 0; i3 < 100; ++i3) {
            chgList[i3] = dir + "chgap" + i3;
            TestCDPSUtil.replaceStreamTable(chgList[i3], true, 8);
        }
        _logger.info("---1. Changelog streams are created ---");
        for (i3 = 0; i3 < 200; ++i3) {
            jsrcList[i3] = dir + "jap" + i3;
            j = i3 % 100;
            topicFullList[i3] = chgList[j] + ":jap" + i3;
            jsonTableList[i3] = TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrcList[i3]);
        }
        _logger.info("---2. Source json tables are created ---");
        for (i3 = 0; i3 < 200; ++i3) {
            j = i3 % 100;
            TestCDPSUtil.setupCDPSReplicaWithColumns(jsrcList[i3], chgList[j], topicFullList[i3], false, null);
        }
        _logger.info("---3. Changelog relationships are set ---");
        ArrayList<JsonLoader> jloaderList = new ArrayList<JsonLoader>();
        for (int i4 = 0; i4 < 200; ++i4) {
            jloaderList.add(new JsonLoader(0, RowCount, jsonTableList[i4], topicFullList[i4]));
        }
        _logger.info("---4. JsonLoaders are created ---");
        ArrayList<Thread> threadLoaderList = new ArrayList<Thread>();
        for (i2 = 0; i2 < 200; ++i2) {
            threadLoaderList.add(new Thread((Runnable)jloaderList.get(i2), "loader" + i2));
        }
        _logger.info("---5. Threads for JsonLoaders created ---");
        for (i2 = 0; i2 < 200; ++i2) {
            ((Thread)threadLoaderList.get(i2)).start();
        }
        _logger.info("---6. Threads for JsonLoaders started ---");
        for (i2 = 0; i2 < 200; ++i2) {
            ((Thread)threadLoaderList.get(i2)).join();
        }
        _logger.info("---7. Threads for JsonLoaders finished ---");
        ArrayList<JsonConsumer> jconsumerList = new ArrayList<JsonConsumer>();
        for (int i5 = 0; i5 < 200; ++i5) {
            jconsumerList.add(new JsonConsumer(topicFullList[i5], RowCount));
        }
        _logger.info("---8. JsonConsumers are created ---");
        ArrayList<Thread> threadConsumerList = new ArrayList<Thread>();
        for (i = 0; i < 200; ++i) {
            threadConsumerList.add(new Thread((Runnable)jconsumerList.get(i), "consumer" + i));
        }
        _logger.info("---9. Threads for JsonConsumers are created ---");
        for (i = 0; i < 200; ++i) {
            ((Thread)threadConsumerList.get(i)).start();
        }
        _logger.info("---10. Threads for JsonConsumers are started ---");
        for (i = 0; i < 200; ++i) {
            ((Thread)threadConsumerList.get(i)).join();
        }
        _logger.info("---11. Threads for JsonConsumers are finished ---");
        for (i = 0; i < 200; ++i) {
            jsonTableList[i].close();
        }
    }

    @Test
    public void testFakeAppend() throws Exception {
        int i;
        int i2;
        int j;
        int i3;
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        int RowCount = 1000;
        String dir = "/tmp/fakeAppend/";
        int retcode = TestCDPSCLIWithCluster.cmdRmdir(dir);
        retcode = TestCDPSCLIWithCluster.cmdMkdir(dir);
        Assert.assertTrue((0 == retcode ? 1 : 0) != 0);
        String[] jsrcList = new String[100];
        String[] chgList = new String[50];
        String[] topicFullList = new String[100];
        Table[] jsonTableList = new Table[100];
        for (i3 = 0; i3 < 50; ++i3) {
            chgList[i3] = dir + "chgap" + i3;
            TestCDPSUtil.replaceStreamTable(chgList[i3], true, 1);
        }
        _logger.info("---1. Changelog streams are created ---");
        for (i3 = 0; i3 < 100; ++i3) {
            jsrcList[i3] = dir + "jap" + i3;
            j = i3 % 50;
            topicFullList[i3] = chgList[j] + ":jap" + i3;
            jsonTableList[i3] = TestCDPSUtil.replaceScfJsonTable(testAdmin, jsrcList[i3]);
        }
        _logger.info("---2. Source json tables are created ---");
        for (i3 = 0; i3 < 100; ++i3) {
            j = i3 % 50;
            TestCDPSUtil.setupCDPSReplicaWithColumns(jsrcList[i3], chgList[j], topicFullList[i3], false, null);
        }
        _logger.info("---3. Changelog relationships are set ---");
        ArrayList<JsonLoader> jloaderList = new ArrayList<JsonLoader>();
        for (int i4 = 0; i4 < 100; ++i4) {
            jloaderList.add(new JsonLoader(0, RowCount, jsonTableList[i4], topicFullList[i4]));
        }
        _logger.info("---4. JsonLoaders are created ---");
        ArrayList<Thread> threadLoaderList = new ArrayList<Thread>();
        for (int i5 = 0; i5 < 100; ++i5) {
            threadLoaderList.add(new Thread((Runnable)jloaderList.get(i5), "loader" + i5));
        }
        _logger.info("---5. Threads for JsonLoaders created ---");
        Object cmd = tedCmdPath.toString() + " enable fakeAppend";
        int setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (setStatus != 0) {
            throw new IOException((String)cmd + " failed with exitStatus: " + setStatus);
        }
        _logger.info("---6. enabled fakeAppend ---");
        for (i2 = 0; i2 < 100; ++i2) {
            ((Thread)threadLoaderList.get(i2)).start();
        }
        _logger.info("---7. Threads for JsonLoaders started ---");
        for (i2 = 0; i2 < 100; ++i2) {
            ((Thread)threadLoaderList.get(i2)).join();
        }
        _logger.info("---8. Threads for JsonLoaders finished ---");
        String retstr = null;
        boolean hasErr = false;
        for (i = 0; i < 5; ++i) {
            Random rand = new Random();
            int n = rand.nextInt(100);
            cmd = new String("maprcli table changelog list -refreshnow true -path " + jsrcList[n] + " -json");
            retstr = DBTests.ExecuteShellCmd((String)cmd);
            System.out.println(retstr);
            hasErr = retstr.contains("error");
            if (hasErr) break;
        }
        Assert.assertTrue((!hasErr ? 1 : 0) != 0);
        _logger.info("---9. checked changelog list ---");
        for (i = 0; i < 100; ++i) {
            jsonTableList[i].close();
        }
        cmd = tedCmdPath.toString() + " disable fakeAppend";
        setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (setStatus != 0) {
            throw new IOException((String)cmd + " failed with exitStatus: " + setStatus);
        }
        _logger.info("---10. enabled fakeAppend ---");
    }

    static {
        repoRootPath = BaseTest.getSourceRoot();
        if (repoRootPath != null && !Files.exists(tedCmdPath = repoRootPath.resolve(TED_FILE), new LinkOption[0])) {
            tedCmdPath = null;
        }
    }
}

