/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.baseutils.BinaryString;
import com.mapr.db.FamilyDescriptor;
import com.mapr.db.Table;
import com.mapr.db.TableDescriptor;
import com.mapr.db.cdc.impl.ChangeDataReaderImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImplBinary;
import com.mapr.db.cdc.impl.ChangeNodeImpl;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.rowcol.DBDocumentImpl;
import com.mapr.fs.utils.ssh.TestCluster;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.ojai.Document;
import org.ojai.FieldPath;
import org.ojai.KeyValue;
import org.ojai.Value;
import org.ojai.store.cdc.ChangeDataReader;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.ojai.types.ODate;
import org.ojai.types.OTime;
import org.ojai.types.OTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCDPSUtil {
    public static final String CDCJsonFmtDoc = "$$document";
    public static final String CDCJsonFmtID = "_id";
    public static final String CDCJsonFmtRowOpType = "$opType";
    public static final String CDCJsonFmtRowOpTime = "$opTime";
    public static final String CDCJsonFmtFieldPath = "$fieldPath";
    public static final String CDCJsonFmtFieldOp = "$fieldOp";
    public static final String CDCJsonFmtFieldValue = "$fieldValue";
    public static final String CDCJsonFmtFieldVersion = "$fieldVersion";
    public static final String CDCJsonFmtMutations = "$mutations";
    public static final String CDCJsonFmtTagBYTE = "$numberByte";
    public static final String CDCJsonFmtTagSHORT = "$numberShort";
    public static final String CDCJsonFmtTagINT = "$numberInt";
    public static final String CDCJsonFmtTagLONG = "$numberLong";
    public static final String CDCJsonFmtTagFLOAT = "$numberFloat";
    public static final String CDCJsonFmtTagDECIMAL = "$decimal";
    public static final String CDCJsonFmtTagDATE = "$dateDay";
    public static final String CDCJsonFmtTagTIME = "$time";
    public static final String CDCJsonFmtTagTIMESTAMP = "$date";
    public static final String CDCJsonFmtTagINTERVAL = "$interval";
    public static final String CDCJsonFmtTagBINARY = "$binary";
    public static final String CDCJsonFmtRECORD_OP_INSERT = "$RECORD_INSERT";
    public static final String CDCJsonFmtRECORD_OP_UPDATE = "$RECORD_UPDATE";
    public static final String CDCJsonFmtRECORD_OP_DELETE = "$RECORD_DELETE";
    public static final String CDCJsonFmtOP_NULL = "$NULL";
    public static final String CDCJsonFmtOP_SET = "$SET";
    public static final String CDCJsonFmtOP_MERGE = "$MERGE";
    public static final String CDCJsonFmtOP_DELETE = "$DELETE";
    public static final String CDCJsonFmtOP_DELETE_EXACT = "$DELETE_EXACT";
    public static final String CDCJsonFmtOP_PUT = "$PUT";
    private static final FieldPath P_MAPB = FieldPath.parseFrom((String)"mapB");
    private static final FieldPath P_MAPBC = FieldPath.parseFrom((String)"mapB.mapC");
    private static final FieldPath P_MAPD = FieldPath.parseFrom((String)"mapD");
    private static final FieldPath P_MAPE = FieldPath.parseFrom((String)"mapE");
    private static final FieldPath P_MAPBCF = FieldPath.parseFrom((String)"mapB.mapC.mapF");
    private static final FieldPath P_ARRAYG = FieldPath.parseFrom((String)"arrayG");
    private static final FieldPath P_ARRAYH = FieldPath.parseFrom((String)"arrayH");
    private static final FieldPath P_ARRAYI = FieldPath.parseFrom((String)"arrayI");
    private static final FieldPath P_ARRAYBJ = FieldPath.parseFrom((String)"mapB.arrayJ");
    private static final FieldPath P_ARRAYBCK = FieldPath.parseFrom((String)"mapB.mapC.arrayK");
    private static final FieldPath P_ARRAYBCFL = FieldPath.parseFrom((String)"mapB.mapC.mapF.arrayL");
    static final int Default_BreakNum = 30;
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSUtil.class);

    public static Document getAllTypeRecord() {
        DBDocumentImpl rec = new DBDocumentImpl();
        rec.setArray("Scores", new int[]{10, 20, 30}).setArray("Friends", new Object[]{"Anurag", "Bharat", new Integer(10)}).set("map.boolean", true).set("map.string", "string").set("map.byte", (byte)100).set("map.short", (short)10000).set("map.int", 50000).set("map.long", 12345678999L).set("map.float1A", 10.1234f).set("map.float2A", 10.123456f).set("map.float3A", 10.123457f).set("map.float4A", 10.123457f).set("map.float1B", -1.358f).set("map.float2B", -1.358883f).set("map.float3B", -1.3588837f).set("map.float4B", -1.3588837f).set("map.double1A", 10.12345).set("map.double2A", 10.123456789012344).set("map.double3A", 10.123456789012346).set("map.double4A", 10.123456789012346).set("map.double1B", -3.793147).set("map.double2B", -3.793147169219067).set("map.double3B", -3.793147169219068).set("map.double4B", -3.793147169219068).set("map.posdate", new ODate(2018, 3, 26)).set("map.postime", new OTime(15, 16, 17, 18)).set("map.postimestamp", new OTimestamp(2017, 2, 25, 1, 2, 3, 4)).set("map.negdate", new ODate(1018, 3, 26)).set("map.negtimestamp", new OTimestamp(1016, 2, 25, 1, 2, 3, 4)).setArray("map.Array2", new Object[]{new Double("-2321232.1234312"), new Long(-50000L), new Integer(10)}).setNull("NULL");
        ByteBuffer bbuf = ByteBuffer.allocate(100);
        for (int i = 0; i < bbuf.capacity(); ++i) {
            bbuf.put((byte)i);
        }
        bbuf.rewind();
        rec.set("binary3", bbuf).set("Time", new OTime(10000000L)).set("Date", new ODate(432000000L));
        rec.set("boolean", false);
        rec.set("string", "stringstrinstringstring");
        rec.set("escapestring", "escape\b\f\n\r\t\"\\string");
        rec.set("byte", (byte)100);
        ArrayList<Object> values = new ArrayList<Object>();
        values.add("Field1");
        values.add(new Integer(500));
        values.add(new Double(5555.5555));
        rec.set("map.LIST", values);
        ArrayList<Object> values2 = new ArrayList<Object>();
        values2.add("Field1");
        values2.add(new Integer(500));
        values2.add(new Double(5555.5555));
        values2.add(new int[]{500, 1000, 1500, 2000});
        rec.set("map.LIST2", values2);
        rec.set("NAME", "ANURAG");
        boolean[] ba = new boolean[]{false, true, true};
        rec.setArray("map.boolarray", ba);
        return rec;
    }

    public static String[] getFieldsFromCliJsonStr(String[] fieldNames, String jsonstr) throws Exception {
        int i;
        _logger.info("input fieldNames " + Arrays.toString(fieldNames));
        String[] retlines = jsonstr.split("\\n");
        int linenum = retlines.length;
        Assert.assertTrue((linenum >= 0 ? 1 : 0) != 0);
        String[] retVals = new String[fieldNames.length];
        for (i = 0; i < fieldNames.length; ++i) {
            retVals[i] = null;
        }
        for (i = 0; i < linenum; ++i) {
            String[] linestrs = retlines[i].split(":");
            int fieldnum = linestrs.length;
            if (fieldnum != 2) continue;
            for (int j = 0; j < fieldNames.length; ++j) {
                if (!linestrs[0].trim().equalsIgnoreCase("\"" + fieldNames[j] + "\"")) continue;
                retVals[j] = linestrs[1].trim();
                if (retVals[j].startsWith("\"")) {
                    retVals[j] = retVals[j].substring(1);
                }
                if (retVals[j].endsWith(",")) {
                    retVals[j] = retVals[j].substring(0, retVals[j].length() - 1);
                }
                if (!retVals[j].endsWith("\"")) continue;
                retVals[j] = retVals[j].substring(0, retVals[j].length() - 1);
            }
        }
        return retVals;
    }

    public static boolean jsonPathStrEquals(String jpaths1, String jpaths2) {
        Object[] jps2;
        Object[] jps1 = jpaths1.split(",");
        int len = jps1.length;
        if (len != (jps2 = jpaths2.split(",")).length) {
            return false;
        }
        Arrays.sort(jps1);
        Arrays.sort(jps2);
        for (int i = 0; i < len; ++i) {
            if (((String)jps1[i]).equalsIgnoreCase((String)jps2[i])) continue;
            return false;
        }
        return true;
    }

    public static String StringArrayEqualsWithOutOrder(String[] theAray, String delimiter) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < theAray.length; ++i) {
            if (i > 0) {
                sb.append(delimiter);
            }
            String item = theAray[i];
            sb.append(item);
        }
        return sb.toString();
    }

    public static ArrayList<String> CorrectSpaceSplit(String[] valToCorrect) {
        _logger.info("before correct: " + Arrays.toString(valToCorrect));
        ArrayList<String> correctedVals = new ArrayList<String>();
        int i = 0;
        int j = 0;
        while (i < valToCorrect.length) {
            if (valToCorrect[i].equals("|")) {
                Assert.assertTrue((i != 0 && i != valToCorrect.length - 1 ? 1 : 0) != 0);
                correctedVals.remove(j - 1);
                correctedVals.add(valToCorrect[i - 1] + "|" + valToCorrect[i - 1]);
                i += 2;
                ++j;
                continue;
            }
            correctedVals.add(valToCorrect[i]);
            ++i;
            ++j;
        }
        _logger.info("after correct: " + correctedVals.toString());
        return correctedVals;
    }

    public static void VerifyCmdOutput(String cmdretstr, String key, String[] colToVerify, String[] valToVerify, boolean verifyexist) {
        _logger.info(cmdretstr);
        if (key == null) {
            return;
        }
        String[] retlines = cmdretstr.split("\\n");
        int linenum = retlines.length;
        Assert.assertTrue((linenum >= 0 ? 1 : 0) != 0);
        int[] indexes = null;
        if (colToVerify != null && valToVerify != null) {
            int i;
            Assert.assertEquals((long)colToVerify.length, (long)valToVerify.length);
            Object[] linestr0 = retlines[0].split("\\s+");
            _logger.info("line0: " + Arrays.toString(linestr0));
            indexes = new int[colToVerify.length];
            for (i = 0; i < colToVerify.length; ++i) {
                indexes[i] = -1;
            }
            block1: for (i = 0; i < colToVerify.length; ++i) {
                for (int j = 0; j < linestr0.length; ++j) {
                    if (!((String)linestr0[j]).equals(colToVerify[i])) continue;
                    indexes[i] = j;
                    continue block1;
                }
            }
        }
        _logger.info("Indexes: " + Arrays.toString(indexes));
        boolean found = false;
        for (int ln = 1; ln < linenum; ++ln) {
            if (!retlines[ln].contains(key)) continue;
            if (colToVerify != null && valToVerify != null) {
                String[] linestrbyspace = retlines[ln].split("\\s+");
                ArrayList<String> linestr = TestCDPSUtil.CorrectSpaceSplit(linestrbyspace);
                _logger.info("line " + ln + " len:" + linestr.size() + "items:" + linestr);
                for (int i = 0; i < valToVerify.length; ++i) {
                    int idx = indexes[i];
                    Assert.assertTrue((idx >= 0 ? 1 : 0) != 0);
                    String valA = valToVerify[i];
                    String valB = linestr.get(idx);
                    if (verifyexist) {
                        Assert.assertEquals((Object)valA, (Object)valB);
                        continue;
                    }
                    Assert.assertNotEquals((Object)valA, (Object)valB);
                }
            }
            found = true;
            break;
        }
        if (verifyexist) {
            Assert.assertTrue((boolean)found);
        } else {
            Assert.assertTrue((!found ? 1 : 0) != 0);
        }
    }

    public static boolean FindValueInCmdOutput(String cmdretstr, String key, String[] colToVerify, String[] valToVerify) {
        _logger.info(cmdretstr);
        if (key == null) {
            _logger.error("Input Key is null!");
            return false;
        }
        String[] retlines = cmdretstr.split("\\n");
        int linenum = retlines.length;
        Assert.assertTrue((linenum >= 0 ? 1 : 0) != 0);
        int[] indexes = null;
        if (colToVerify != null && valToVerify != null) {
            int i;
            Assert.assertEquals((long)colToVerify.length, (long)valToVerify.length);
            Object[] linestr0 = retlines[0].split("\\s+");
            _logger.info("line0: " + Arrays.toString(linestr0));
            indexes = new int[colToVerify.length];
            for (i = 0; i < colToVerify.length; ++i) {
                indexes[i] = -1;
            }
            block1: for (i = 0; i < colToVerify.length; ++i) {
                for (int j = 0; j < linestr0.length; ++j) {
                    if (!((String)linestr0[j]).equals(colToVerify[i])) continue;
                    indexes[i] = j;
                    continue block1;
                }
            }
        }
        _logger.info("Indexes: " + Arrays.toString(indexes));
        boolean found = false;
        for (int ln = 1; ln < linenum; ++ln) {
            if (!retlines[ln].contains(key)) continue;
            if (colToVerify != null && valToVerify != null) {
                String[] linestrbyspace = retlines[ln].split("\\s+");
                ArrayList<String> linestr = TestCDPSUtil.CorrectSpaceSplit(linestrbyspace);
                _logger.info("line " + ln + " len:" + linestr.size() + "items:" + linestr);
                for (int i = 0; i < valToVerify.length; ++i) {
                    int idx = indexes[i];
                    Assert.assertTrue((idx >= 0 ? 1 : 0) != 0);
                    String valA = valToVerify[i];
                    String valB = linestr.get(idx);
                    if (!valA.equals(valB)) continue;
                    _logger.info("valueA " + valA + " matches valueB " + valB);
                    return true;
                }
            }
            found = true;
            break;
        }
        if (found) {
            _logger.info("Key " + key + " found, but value " + Arrays.toString(valToVerify) + " not found");
        } else {
            _logger.error("Key " + key + " not found");
        }
        return false;
    }

    public static void printCDRec(ChangeDataRecord cdr) {
        _logger.info("\n--------------------");
        TestCDPSUtil.printChangeRecHeader(cdr);
        TestCDPSUtil.printChangeDataThroughIter(cdr);
        TestCDPSUtil.printChangeDataThroughReader(cdr);
    }

    public static ChangeNode moveToNextNodeReader(ChangeDataReader cdr) {
        ChangeDataReaderImpl cdReader = (ChangeDataReaderImpl)cdr;
        ChangeEvent et = cdReader.next();
        Assert.assertNotNull((Object)et);
        ChangeNodeImpl cd = cdReader.getChangeNode();
        Assert.assertNotNull((Object)cd);
        return cd;
    }

    public static ChangeNode moveToNextNodeReaderBinary(ChangeDataReader cdr) {
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary cdReader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr;
        ChangeEvent et = cdReader.next();
        Assert.assertNotNull((Object)et);
        ChangeNodeImpl cd = cdReader.getChangeNode();
        Assert.assertNotNull((Object)cd);
        return cd;
    }

    public static void verifyHead(ChangeDataRecord cdrec, boolean isJson, String strRowId, OpTimeVerifyMethod optMethod, long opTime, ChangeOp hOp) throws IOException {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        Assert.assertEquals((Object)isJson, (Object)cdr.isJson());
        Assert.assertEquals((Object)Value.Type.STRING, (Object)cdr.getId().getType());
        Assert.assertEquals((Object)strRowId, (Object)cdr.getId().getString());
        switch (optMethod) {
            case IsZero: {
                Assert.assertEquals((long)cdr.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() <= opTime ? 1 : 0));
                break;
            }
        }
        Assert.assertEquals((Object)hOp, (Object)cdr.getOpType());
    }

    public static void verifyHeadBinary(ChangeDataRecord cdrec, boolean isJson, byte[] byteRowId, OpTimeVerifyMethod optMethod, long opTime, ChangeOp hOp) throws IOException {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        Assert.assertEquals((Object)isJson, (Object)cdr.isJson());
        Assert.assertEquals((Object)Value.Type.BINARY, (Object)cdr.getId().getType());
        _logger.info("byteRowId:" + Arrays.toString(byteRowId));
        _logger.info("cn.getId :" + Arrays.toString(cdr.getId().getBinary().array()));
        Assert.assertArrayEquals((byte[])byteRowId, (byte[])cdr.getId().getBinary().array());
        switch (optMethod) {
            case IsZero: {
                Assert.assertEquals((long)cdr.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() <= opTime ? 1 : 0));
                break;
            }
        }
        Assert.assertEquals((Object)hOp, (Object)cdr.getOpType());
    }

    public static void verifyRowDelete(String strRowId, ChangeDataRecord cdr) throws IOException {
        TestCDPSUtil.verifyHead(cdr, true, strRowId, OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.DELETE);
        Iterator cdrItr = cdr.iterator();
        long opTime = cdr.getOpTimestamp();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
        ChangeDataReader reader = cdr.getReader();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReader(reader);
        TestCDPSUtil.assertNodeEquals(cd, OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
    }

    public static void verifyRowDeleteBinary(String strRowId, ChangeDataRecord cdr) throws IOException {
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.DELETE);
        Iterator cdrItr = cdr.iterator();
        long opTime = cdr.getOpTimestamp();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary reader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr.getReader();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary((ChangeDataReader)reader);
        TestCDPSUtil.assertNodeEquals(cd, OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
    }

    public static void assertNodeEqualsBinary(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, byte[] value) {
        TestCDPSUtil.assertNodeEqualsBinary(cnode, optMethod, event, op, opTime, fieldName, value, false);
    }

    public static void assertNodeEqualsBinary(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, byte[] value, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        TestCDPSUtil.assertNodeValueEqualsBinary(cn, value, isDebug);
    }

    public static void assertNodeEquals(ChangeNode cn, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, Value value) {
        TestCDPSUtil.assertNodeEquals(cn, optMethod, event, op, opTime, fieldName, value, false);
    }

    public static void assertNodeEquals(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, Value value, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        TestCDPSUtil.assertNodeValueEquals(cn, value, isDebug);
    }

    public static void assertNodeIsJavaNull(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        Assert.assertNull((Object)cn.getValue());
    }

    public static void assertNodeIsOjaiNull(ChangeDataReader cdreader, ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        Assert.assertNotNull((Object)cn.getValue());
        Assert.assertTrue((cn.getValue().getType() == Value.Type.NULL ? 1 : 0) != 0);
        Assert.assertTrue((cdreader.getType() == Value.Type.NULL ? 1 : 0) != 0);
    }

    public static void assertNodeFieldEquals(ChangeNodeImpl cn, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        Assert.assertEquals((Object)cn.getEvent(), (Object)event);
        Assert.assertEquals((Object)cn.getOp(), (Object)op);
        switch (optMethod) {
            case IsZero: {
                if (cn.getOpTimestamp() != 0L) {
                    _logger.info("TimeCompareFailed: cn.getOpTimestamp(): " + cn.getOpTimestamp() + "!=0");
                }
                Assert.assertEquals((long)cn.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                if (cn.getOpTimestamp() <= opTime) {
                    _logger.info("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should > opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                if (cn.getOpTimestamp() < opTime) {
                    _logger.info("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should >= opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                if (cn.getOpTimestamp() != opTime) {
                    System.err.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should == opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                if (cn.getOpTimestamp() >= opTime) {
                    _logger.info("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should < opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                if (cn.getOpTimestamp() > opTime) {
                    _logger.info("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should <= opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() <= opTime ? 1 : 0));
            }
        }
        if (cn.inMap()) {
            Assert.assertEquals((Object)cn.getFieldName(), (Object)fieldName);
        }
    }

    public static void assertNodeValueEquals(ChangeNodeImpl cnode, Value value, boolean isDebug) {
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.valueEquals(value, cnode.getValue(), isDebug));
    }

    public static void assertNodeValueEqualsBinary(ChangeNodeImpl cnode, byte[] value, boolean isDebug) {
        if (value == null) {
            Assert.assertNull((Object)cnode.getValue());
        } else {
            _logger.info("   value:" + Arrays.toString(value));
            _logger.info("cn.value:" + Arrays.toString(cnode.getValue().getBinary().array()));
            Assert.assertArrayEquals((byte[])value, (byte[])cnode.getValue().getBinary().array());
        }
    }

    public static Table replaceScfJsonTable(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(tableName);
    }

    public static Table replaceMcfJsonTable(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        Path TABLE_1 = new Path(tableName);
        TableDescriptor TABLE_DESC_1 = MapRDBImpl.newTableDescriptor((Path)TABLE_1);
        FamilyDescriptor familyDesc = MapRDBImpl.newDefaultFamilyDescriptor().setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf2").setJsonFieldPath(P_MAPB).setCompression(FamilyDescriptor.Compression.ZLIB);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_MAPBC).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf5").setJsonFieldPath(P_MAPE).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(TABLE_DESC_1);
    }

    public static Table replaceMcfJsonTable2(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        Path TABLE_1 = new Path(tableName);
        TableDescriptor TABLE_DESC_1 = MapRDBImpl.newTableDescriptor((Path)TABLE_1);
        FamilyDescriptor familyDesc = MapRDBImpl.newDefaultFamilyDescriptor().setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf2").setJsonFieldPath(P_MAPB).setCompression(FamilyDescriptor.Compression.ZLIB);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_MAPBC).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf4").setJsonFieldPath(P_MAPD).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf5").setJsonFieldPath(P_MAPE).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf6").setJsonFieldPath(P_MAPBCF).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf7").setJsonFieldPath(P_ARRAYG).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf8").setJsonFieldPath(P_ARRAYH).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf9").setJsonFieldPath(P_ARRAYI).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf10").setJsonFieldPath(P_ARRAYBJ).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf11").setJsonFieldPath(P_ARRAYBCK).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf12").setJsonFieldPath(P_ARRAYBCFL).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(TABLE_DESC_1);
    }

    public static Table replaceMcfJsonTableNested(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        FieldPath P_MABCD = FieldPath.parseFrom((String)"mA.mB.mC.mD");
        FieldPath P_AABCD = FieldPath.parseFrom((String)"mA.mB.mC.aD");
        FieldPath P_MABC = FieldPath.parseFrom((String)"mA.mB.mC");
        FieldPath P_AABC = FieldPath.parseFrom((String)"mA.mB.aC");
        FieldPath P_MAB = FieldPath.parseFrom((String)"mA.mB");
        FieldPath P_AAB = FieldPath.parseFrom((String)"mA.aB");
        FieldPath P_MA = FieldPath.parseFrom((String)"mA");
        FieldPath P_AA = FieldPath.parseFrom((String)"aA");
        Path TABLE_1 = new Path(tableName);
        TableDescriptor TABLE_DESC_1 = MapRDBImpl.newTableDescriptor((Path)TABLE_1);
        FamilyDescriptor familyDesc = MapRDBImpl.newDefaultFamilyDescriptor().setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf2").setJsonFieldPath(P_MABCD).setCompression(FamilyDescriptor.Compression.ZLIB);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_AABCD).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf4").setJsonFieldPath(P_MABC).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf5").setJsonFieldPath(P_AABC).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf6").setJsonFieldPath(P_MAB).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf7").setJsonFieldPath(P_AAB).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf8").setJsonFieldPath(P_MA).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf9").setJsonFieldPath(P_AA).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(TABLE_DESC_1);
    }

    public static Table replaceJsonTableWithCFMap(AdminImpl testAdmin, String tablePath, Map<String, String> cfPath) throws IOException {
        if (MapRDBImpl.tableExists((String)tablePath)) {
            MapRDBImpl.deleteTable((String)tablePath);
        }
        TableDescriptor desc = MapRDBImpl.newTableDescriptor((String)tablePath);
        int familyCount = 0;
        for (Map.Entry<String, String> cf : cfPath.entrySet()) {
            if (familyCount++ == 0 && !cf.getKey().equals("default")) {
                desc.addFamily(MapRDBImpl.newDefaultFamilyDescriptor());
                ++familyCount;
            }
            desc.addFamily(MapRDBImpl.newFamilyDescriptor((String)cf.getKey(), (String)cf.getValue()));
        }
        return testAdmin.createTable(desc);
    }

    public static void replaceStreamTable(String tableName) throws Exception {
        TestCDPSUtil.replaceStreamTable(tableName, true, 1);
    }

    public static void replaceStreamTable(String tableName, boolean isChangelog, int partitionNum) throws Exception {
        TestCDPSUtil.replaceStreamTableThrow(tableName, isChangelog, partitionNum, true, true);
    }

    public static int replaceStreamTableThrow(String tableName, boolean isChangelog, int partitionNum, boolean autocreate, boolean throwexp) throws Exception {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        String cmd = "maprcli stream create -path " + tableName + " -ischangelog " + (isChangelog ? "true" : "false") + " -defaultpartitions " + partitionNum + " -autocreate " + (autocreate ? "true" : "false");
        _logger.info("\n" + cmd);
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        if (retcode != 0 && throwexp) {
            throw new IOException("fail to create stream table " + tableName);
        }
        return retcode;
    }

    public static boolean deleteStreamTable(String tableName) throws Exception {
        if (MapRDBImpl.tableExists((String)tableName)) {
            return MapRDBImpl.deleteTable((String)tableName);
        }
        return true;
    }

    public static int setupCDPSReplicaReturnCode(String srcTable, String dstTable, String dstTopicFullName) throws Exception {
        return TestCDPSUtil.setupCDPSReplicaReturnCode(srcTable, dstTable, dstTopicFullName, null);
    }

    public static int setupCDPSReplicaReturnCode(String srcTable, String dstTable, String dstTopicFullName, String includeColumns) throws Exception {
        String cmd = "maprcli table changelog add -path " + srcTable + " -changelog " + dstTopicFullName + (String)(includeColumns == null ? "" : " -includecolumns " + includeColumns);
        _logger.info(cmd);
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        if (retcode != 0) {
            return retcode;
        }
        cmd = "maprcli stream topic list -path " + dstTable;
        _logger.info(cmd);
        retcode = TestCluster.runCommand((String)cmd).getExitCode();
        return retcode;
    }

    public static void setupCDPSReplica(String srcTable, String dstTable, String dstTopic) throws Exception {
        int retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(srcTable, dstTable, dstTopic);
        if (retcode != 0) {
            throw new IOException("fail to create dst topic " + dstTopic);
        }
    }

    public static void setupCDPSReplicaWithColumns(String srcTable, String dstTable, String dstTopicFullName, Boolean paused, String columns) throws Exception {
        TestCDPSUtil.setupCDPSReplicaWithColumns(srcTable, dstTable, dstTopicFullName, paused, columns, null);
    }

    public static void setupCDPSReplicaWithColumns(String srcTable, String dstTable, String dstTopicFullName, Boolean paused, String columns, String includeColumns) throws Exception {
        String cmd = "maprcli table changelog add -path " + srcTable + " -changelog " + dstTopicFullName + " -paused " + paused.toString() + (String)(columns == null ? "" : " -columns " + columns) + (String)(includeColumns == null ? "" : " -includecolumns " + includeColumns);
        _logger.info(cmd);
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        if (retcode != 0) {
            throw new IOException("fail to setup changelog replication for " + dstTopicFullName);
        }
    }

    public static void setupCDPSReplicaWithPropagateexistingdata(String srcTable, String dstTable, String dstTopicFullName, Boolean paused, boolean propagateexistingdata) throws Exception {
        String cmd = "maprcli table changelog add -path " + srcTable + " -changelog " + dstTopicFullName + " -paused " + paused.toString() + " -propagateexistingdata " + (propagateexistingdata ? "true" : "false");
        _logger.info(cmd);
        int retcode = TestCluster.runCommand((String)cmd).getExitCode();
        if (retcode != 0) {
            throw new IOException("fail to setup changelog replication for " + dstTopicFullName);
        }
    }

    public static void printChangeRecHeader(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        _logger.info("---" + (cdr.isJson() ? "Json" : "Binary") + " key(" + (Value.Type.STRING == cdr.getId().getType() ? "STRING" : "BINARY") + ":" + (Value.Type.STRING == cdr.getId().getType() ? cdr.getId().getString() : BinaryString.toStringBinary((byte[])cdr.getId().getBinary().array())) + ")  type(" + cdr.getType().name() + ") optime(" + cdr.getOpTimestamp() + " " + new OTimestamp(cdr.getOpTimestamp()) + ")");
    }

    public static void printChangeDataThroughIter(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        _logger.info(" ------itr prints-----");
        Iterator cdrItr = cdr.iterator();
        int count = 0;
        while (cdrItr.hasNext()) {
            Map.Entry cdEntry = (Map.Entry)cdrItr.next();
            String fpName = ((FieldPath)cdEntry.getKey()).asPathString();
            ChangeNodeImpl cd = (ChangeNodeImpl)cdEntry.getValue();
            _logger.info("node" + count + ": field:" + fpName + ", value:" + cd.toString());
            ++count;
        }
    }

    public static void printChangeDataThroughReader(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        _logger.info(" ------reader prints-----");
        if (cdr.isJson()) {
            ChangeDataReaderImpl reader = (ChangeDataReaderImpl)cdr.getReader();
            ChangeNodeImpl cd = null;
            int count = 0;
            while (reader.next() != null) {
                cd = reader.getChangeNode();
                _logger.info("node" + count + ":" + cd.toStringWithArrayIndexTime());
                ++count;
            }
        } else {
            ChangeDataRecordImplBinary.ChangeDataReaderImplBinary reader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr.getReader();
            ChangeNodeImpl cd = null;
            int count = 0;
            while (reader.next() != null) {
                cd = reader.getChangeNode();
                _logger.info("node" + count + ":" + cd.toStringWithArrayIndexTime());
                ++count;
            }
        }
    }

    public static void printChangeDataThroughReaderBinary(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        _logger.info(" ------reader prints-----");
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary reader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr.getReader();
        ChangeNodeImpl cd = null;
        int count = 0;
        while (reader.next() != null) {
            cd = reader.getChangeNode();
            _logger.info("node" + count + ":" + cd.toStringWithArrayIndexTime());
            ++count;
        }
    }

    public static KafkaConsumer<byte[], ChangeDataRecord> startConsumer(Properties props, String dstTopic) throws Exception {
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        _logger.info("subscribe to topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static KafkaConsumer<byte[], ChangeDataRecord> startConsumer(String dstTopic) throws Exception {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "cdcjsontest1");
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        _logger.info("subscribe to topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static KafkaConsumer<byte[], byte[]> startByteArrayConsumer(String dstTopic, boolean asGroup, String grpId) throws Exception {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        if (asGroup) {
            props.put("group.id", grpId);
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        _logger.info("subscribe to topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static KafkaConsumer<byte[], String> startStringConsumer(String dstTopic, boolean asGroup, String grpId) throws Exception {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        if (asGroup) {
            props.put("group.id", grpId);
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        _logger.info("subscribe to topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static KafkaConsumer<byte[], String> unsubscribeStringConsumer(KafkaConsumer<byte[], String> consumer) throws Exception {
        consumer.unsubscribe();
        return consumer;
    }

    public static KafkaConsumer<byte[], String> subscribeStringConsumer(String dstTopic, KafkaConsumer<byte[], String> consumer) throws Exception {
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        _logger.info("subscribe topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static boolean valueIsNull(Value kv) {
        boolean isSame;
        boolean bl = isSame = kv.getType() == Value.Type.NULL;
        if (!isSame) {
            _logger.error("case1: not Type.NULL" + kv);
        }
        return isSame;
    }

    public static boolean valueEquals(Value kv1, Value kv2) {
        return TestCDPSUtil.valueEquals(kv1, kv2, false);
    }

    public static boolean isNumber(Value kv1) {
        Value.Type type1 = kv1.getType();
        return type1 == Value.Type.BYTE || type1 == Value.Type.DECIMAL || type1 == Value.Type.DOUBLE || type1 == Value.Type.FLOAT || type1 == Value.Type.INT || type1 == Value.Type.LONG || type1 == Value.Type.SHORT;
    }

    public static boolean numberEquals(Value kv1, Value kv2) {
        if (TestCDPSUtil.isNumber(kv1) && TestCDPSUtil.isNumber(kv2)) {
            long pv2;
            long pv1 = kv1.getLong();
            if (pv1 != (pv2 = kv2.getLong())) {
                _logger.error("case21: different number kv1 type: " + kv1.getType() + "kv2 type:" + kv2.getType() + ", value: kv1:" + pv1 + " kv2:" + pv2);
            }
            return pv1 == pv2;
        }
        return false;
    }

    public static boolean valueEquals(Value kv1, Value kv2, boolean isDebug) {
        if (isDebug) {
            _logger.info("--- kv1:" + kv1 + " kv2:" + kv2 + " ---");
        }
        if (kv1 == null && kv2 == null) {
            return true;
        }
        if (kv1 == null) {
            return TestCDPSUtil.valueIsNull(kv2);
        }
        if (kv2 == null) {
            return TestCDPSUtil.valueIsNull(kv1);
        }
        if (kv1.getType() != kv2.getType()) {
            if (TestCDPSUtil.isNumber(kv1) && TestCDPSUtil.isNumber(kv2)) {
                return TestCDPSUtil.numberEquals(kv1, kv2);
            }
            if (isDebug) {
                _logger.error("case2: different Type: kv1 type: " + kv1.getType() + " kv2 type:" + kv2.getType() + " kv1:" + kv1 + " kv2:" + kv2);
            }
            return false;
        }
        Value.Type type = kv1.getType();
        if (type == Value.Type.MAP) {
            Map map1 = kv1.getMap();
            Map map2 = kv2.getMap();
            Object child1 = null;
            Object child2 = null;
            String key1 = null;
            String key2 = null;
            TreeSet keys1 = new TreeSet(map1.keySet());
            Iterator iter1 = keys1.iterator();
            TreeSet keys2 = new TreeSet(map2.keySet());
            Iterator iter2 = keys2.iterator();
            while (iter1.hasNext() || iter2.hasNext()) {
                key1 = (String)iter1.next();
                if (!Objects.equals(key1, key2 = (String)iter2.next())) {
                    if (isDebug) {
                        _logger.error("case3: different key: key1:" + key1 + " key2:" + key2 + " kv1:" + kv1 + " kv2:" + kv2);
                    }
                    return false;
                }
                child1 = map1.get(key1);
                child2 = map2.get(key2);
                if (child1 == null && child2 == null) continue;
                if (child1 == null) {
                    if (child2 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child2);
                    }
                    if (isDebug) {
                        _logger.error("case4: child2 not null: child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                    }
                    return false;
                }
                if (child2 == null) {
                    if (child1 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child1);
                    }
                    if (isDebug) {
                        _logger.error("case5: child1 not null: child1:" + child1 + " kv1:" + kv1 + " kv2:" + kv2);
                    }
                    return false;
                }
                boolean childMatched = false;
                childMatched = child1 instanceof Value && child1 instanceof Value ? TestCDPSUtil.valueEquals(child1, child2) : Objects.equals(child1, child2);
                if (childMatched) continue;
                if (isDebug) {
                    _logger.error("case6: child mismatch: child1:" + child1 + " child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            if (iter1.hasNext()) {
                if (isDebug) {
                    _logger.error("case7: kv1 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            if (iter2.hasNext()) {
                if (isDebug) {
                    _logger.error("case8: kv2 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            return true;
        }
        if (type == Value.Type.ARRAY) {
            List list1 = kv1.getList();
            List list2 = kv2.getList();
            Object child1 = null;
            Object child2 = null;
            Iterator iter1 = list1.iterator();
            Iterator iter2 = list2.iterator();
            while (iter1.hasNext() || iter2.hasNext()) {
                child1 = iter1.next();
                child2 = iter2.next();
                if (child1 == null && child2 == null) continue;
                if (child1 == null) {
                    if (child2 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child2);
                    }
                    if (isDebug) {
                        _logger.error("case9: child2 not null: child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                    }
                    return false;
                }
                if (child2 == null) {
                    if (child1 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child1);
                    }
                    if (isDebug) {
                        _logger.error("case10: child1 not null: child1:" + child1 + " kv1:" + kv1 + " kv2:" + kv2);
                    }
                    return false;
                }
                boolean childMatched = false;
                childMatched = child1 instanceof Value && child1 instanceof Value ? TestCDPSUtil.valueEquals(child1, child2) : Objects.equals(child1, child2);
                if (childMatched) continue;
                if (isDebug) {
                    _logger.error("case11: child mismatch: child1:" + child1 + " child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            if (iter1.hasNext()) {
                if (isDebug) {
                    _logger.error("case12: kv1 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            if (iter2.hasNext()) {
                if (isDebug) {
                    _logger.error("case13: kv2 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                }
                return false;
            }
            return true;
        }
        boolean isSame = kv1.equals(kv2);
        if (!isSame) {
            _logger.error("case14: kv1 kv2 mismatch. kv1:" + kv1 + " kv2:" + kv2);
        }
        return isSame;
    }

    public static Document copyDoc(Document srcDoc) throws IOException {
        Document dstDoc = MapRDBImpl.newDocument((String)srcDoc.asJsonString());
        return dstDoc;
    }

    public static List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData(int recnum, KafkaConsumer<byte[], ChangeDataRecord> consumer) throws Exception {
        return TestCDPSUtil.fetchChangeDataWithBreak(recnum, consumer, 30);
    }

    public static List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak(int recnum, KafkaConsumer<byte[], ChangeDataRecord> consumer, int breaknum) throws Exception {
        ArrayList<ConsumerRecord<byte[], ChangeDataRecord>> recList;
        block4: {
            int pollNum = recnum > 100 ? recnum : 100;
            Iterator iter = null;
            recList = new ArrayList<ConsumerRecord<byte[], ChangeDataRecord>>();
            int printCount = 0;
            ConsumerRecords crecs = null;
            do {
                crecs = consumer.poll((long)pollNum);
                int fetchedNum = crecs.count();
                if (++printCount % 100 == 0) {
                    _logger.info("Info: get " + fetchedNum + " records");
                }
                if (printCount / 100 > breaknum) break block4;
                if (fetchedNum > 0) {
                    iter = crecs.iterator();
                    while (iter.hasNext()) {
                        recList.add((ConsumerRecord<byte[], ChangeDataRecord>)((ConsumerRecord)iter.next()));
                    }
                }
                if (recList.size() == recnum) break block4;
            } while (recList.size() <= recnum);
            _logger.error("Error: Expecting " + recnum + " records, get " + crecs.count() + "records");
        }
        return recList;
    }

    public static List<ConsumerRecord<byte[], byte[]>> fetchByteArrayData(int recnum, KafkaConsumer<byte[], byte[]> consumer) throws Exception {
        return TestCDPSUtil.fetchByteArrayDataWithBreak(recnum, consumer, 30);
    }

    public static List<ConsumerRecord<byte[], byte[]>> fetchByteArrayDataWithBreak(int recnum, KafkaConsumer<byte[], byte[]> consumer, int breaknum) throws Exception {
        ArrayList<ConsumerRecord<byte[], byte[]>> recList;
        block4: {
            int pollNum = recnum > 100 ? recnum : 100;
            Iterator iter = null;
            recList = new ArrayList<ConsumerRecord<byte[], byte[]>>();
            int printCount = 0;
            ConsumerRecords crecs = null;
            do {
                crecs = consumer.poll((long)pollNum);
                int fetchedNum = crecs.count();
                if (printCount % 100 == 0) {
                    _logger.info("Info: get " + fetchedNum + " records");
                }
                if (printCount / 100 > breaknum) break block4;
                ++printCount;
                if (fetchedNum > 0) {
                    iter = crecs.iterator();
                    while (iter.hasNext()) {
                        recList.add((ConsumerRecord<byte[], byte[]>)((ConsumerRecord)iter.next()));
                    }
                }
                if (recList.size() == recnum) break block4;
            } while (recList.size() <= recnum);
            _logger.error("Error: Expecting " + recnum + " records, get " + crecs.count() + "records");
        }
        return recList;
    }

    public static List<ConsumerRecord<byte[], String>> fetchStringData(int recnum, KafkaConsumer<byte[], String> consumer) throws Exception {
        return TestCDPSUtil.fetchStringDataWithBreak(recnum, consumer, 30);
    }

    public static List<ConsumerRecord<byte[], String>> fetchStringDataWithBreak(int recnum, KafkaConsumer<byte[], String> consumer, int breaknum) throws Exception {
        ArrayList<ConsumerRecord<byte[], String>> recList;
        block5: {
            int pollNum = recnum > 100 ? recnum : 100;
            Iterator iter = null;
            recList = new ArrayList<ConsumerRecord<byte[], String>>();
            int printCount = 0;
            ConsumerRecords crecs = null;
            do {
                crecs = consumer.poll((long)pollNum);
                int fetchedNum = crecs.count();
                if (printCount % 100 == 0) {
                    _logger.info("Info: get " + fetchedNum + " records");
                }
                if (printCount / 100 > breaknum) {
                    _logger.info("Tried " + printCount + " times, give up");
                    break block5;
                }
                ++printCount;
                if (fetchedNum > 0) {
                    iter = crecs.iterator();
                    while (iter.hasNext()) {
                        recList.add((ConsumerRecord<byte[], String>)((ConsumerRecord)iter.next()));
                    }
                }
                if (recList.size() == recnum) break block5;
            } while (recList.size() <= recnum);
            _logger.error("Error: Expecting " + recnum + " records, get " + crecs.count() + "records");
        }
        _logger.info("Return " + recList.size() + " records");
        return recList;
    }

    public static Document fetchTableRecAndPause(String rowid, Table jsonTable, boolean pause) throws IOException, InterruptedException {
        Document readdoc = jsonTable.findById(rowid);
        _logger.info("ReadDoc:" + readdoc);
        if (pause) {
            TimeUnit.SECONDS.sleep(1L);
        }
        return readdoc;
    }

    public static enum OpTimeVerifyMethod {
        IsZero,
        LargerThanGivenValue,
        LargerEqualGivenValue,
        EqualGivenValue,
        LessThanGivenValue,
        LessEqualGivenValue;

    }
}

