/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.baseutils.BinaryString;
import com.mapr.db.FamilyDescriptor;
import com.mapr.db.Table;
import com.mapr.db.TableDescriptor;
import com.mapr.db.cdc.impl.ChangeDataReaderImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImplBinary;
import com.mapr.db.cdc.impl.ChangeNodeImpl;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.tests.utils.DBTests;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.ojai.Document;
import org.ojai.FieldPath;
import org.ojai.KeyValue;
import org.ojai.Value;
import org.ojai.store.cdc.ChangeDataReader;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.ojai.types.OTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCDPSUtil {
    private static final FieldPath P_MAPB = FieldPath.parseFrom((String)"mapB");
    private static final FieldPath P_MAPBC = FieldPath.parseFrom((String)"mapB.mapC");
    private static final FieldPath P_MAPD = FieldPath.parseFrom((String)"mapD");
    private static final FieldPath P_MAPE = FieldPath.parseFrom((String)"mapE");
    private static final FieldPath P_ARRAYF = FieldPath.parseFrom((String)"arrayF");
    static final int Default_BreakNum = 30;
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSUtil.class);

    private static String StringArraytoString(String[] theAray, String delimiter) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < theAray.length; ++i) {
            if (i > 0) {
                sb.append(delimiter);
            }
            String item = theAray[i];
            sb.append(item);
        }
        return sb.toString();
    }

    public static void printCDRec(ChangeDataRecord cdr) {
        System.out.println("\n--------------------");
        TestCDPSUtil.printChangeRecHeader(cdr);
        TestCDPSUtil.printChangeDataThroughIter(cdr);
        TestCDPSUtil.printChangeDataThroughReader(cdr);
    }

    public static ChangeNode moveToNextNodeReader(ChangeDataReader cdr) {
        ChangeDataReaderImpl cdReader = (ChangeDataReaderImpl)cdr;
        ChangeEvent et = cdReader.next();
        Assert.assertNotNull((Object)et);
        ChangeNodeImpl cd = cdReader.getChangeNode();
        Assert.assertNotNull((Object)cd);
        return cd;
    }

    public static ChangeNode moveToNextNodeReaderBinary(ChangeDataReader cdr) {
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary cdReader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr;
        ChangeEvent et = cdReader.next();
        Assert.assertNotNull((Object)et);
        ChangeNodeImpl cd = cdReader.getChangeNode();
        Assert.assertNotNull((Object)cd);
        return cd;
    }

    public static void verifyHead(ChangeDataRecord cdrec, boolean isJson, String strRowId, OpTimeVerifyMethod optMethod, long opTime, ChangeOp hOp) throws IOException {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        Assert.assertEquals((Object)isJson, (Object)cdr.isJson());
        Assert.assertEquals((Object)Value.Type.STRING, (Object)cdr.getId().getType());
        Assert.assertEquals((Object)strRowId, (Object)cdr.getId().getString());
        switch (optMethod) {
            case IsZero: {
                Assert.assertEquals((long)cdr.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() <= opTime ? 1 : 0));
                break;
            }
        }
        Assert.assertEquals((Object)hOp, (Object)cdr.getOpType());
    }

    public static void verifyHeadBinary(ChangeDataRecord cdrec, boolean isJson, byte[] byteRowId, OpTimeVerifyMethod optMethod, long opTime, ChangeOp hOp) throws IOException {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        Assert.assertEquals((Object)isJson, (Object)cdr.isJson());
        Assert.assertEquals((Object)Value.Type.BINARY, (Object)cdr.getId().getType());
        System.out.println("byteRowId:" + Arrays.toString(byteRowId));
        System.out.println("cn.getId :" + Arrays.toString(cdr.getId().getBinary().array()));
        Assert.assertArrayEquals((byte[])byteRowId, (byte[])cdr.getId().getBinary().array());
        switch (optMethod) {
            case IsZero: {
                Assert.assertEquals((long)cdr.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                Assert.assertEquals((Object)true, (Object)(cdr.getOpTimestamp() <= opTime ? 1 : 0));
                break;
            }
        }
        Assert.assertEquals((Object)hOp, (Object)cdr.getOpType());
    }

    public static void verifyRowDelete(String strRowId, ChangeDataRecord cdr) throws IOException {
        TestCDPSUtil.verifyHead(cdr, true, strRowId, OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.DELETE);
        Iterator cdrItr = cdr.iterator();
        long opTime = cdr.getOpTimestamp();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
        ChangeDataReader reader = cdr.getReader();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReader(reader);
        TestCDPSUtil.assertNodeEquals(cd, OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
    }

    public static void verifyRowDeleteBinary(String strRowId, ChangeDataRecord cdr) throws IOException {
        TestCDPSUtil.verifyHeadBinary(cdr, false, strRowId.getBytes(), OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.DELETE);
        Iterator cdrItr = cdr.iterator();
        long opTime = cdr.getOpTimestamp();
        TestCDPSUtil.assertNodeEquals((ChangeNode)((KeyValue)cdrItr.next()).getValue(), OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary reader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr.getReader();
        ChangeNode cd = TestCDPSUtil.moveToNextNodeReaderBinary((ChangeDataReader)reader);
        TestCDPSUtil.assertNodeEquals(cd, OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE, opTime, null, null);
    }

    public static void assertNodeEqualsBinary(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, byte[] value) {
        TestCDPSUtil.assertNodeEqualsBinary(cnode, optMethod, event, op, opTime, fieldName, value, false);
    }

    public static void assertNodeEqualsBinary(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, byte[] value, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        TestCDPSUtil.assertNodeValueEqualsBinary(cn, value, isDebug);
    }

    public static void assertNodeEquals(ChangeNode cn, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, Value value) {
        TestCDPSUtil.assertNodeEquals(cn, optMethod, event, op, opTime, fieldName, value, false);
    }

    public static void assertNodeEquals(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, Value value, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        TestCDPSUtil.assertNodeValueEquals(cn, value, isDebug);
    }

    public static void assertNodeIsJavaNull(ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        Assert.assertNull((Object)cn.getValue());
    }

    public static void assertNodeIsOjaiNull(ChangeDataReader cdreader, ChangeNode cnode, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        ChangeNodeImpl cn = (ChangeNodeImpl)cnode;
        TestCDPSUtil.assertNodeFieldEquals(cn, optMethod, event, op, opTime, fieldName, isDebug);
        Assert.assertNotNull((Object)cn.getValue());
        Assert.assertTrue((cn.getValue().getType() == Value.Type.NULL ? 1 : 0) != 0);
        Assert.assertTrue((cdreader.getType() == Value.Type.NULL ? 1 : 0) != 0);
    }

    public static void assertNodeFieldEquals(ChangeNodeImpl cn, OpTimeVerifyMethod optMethod, ChangeEvent event, ChangeOp op, long opTime, String fieldName, boolean isDebug) {
        Assert.assertEquals((Object)cn.getEvent(), (Object)event);
        Assert.assertEquals((Object)cn.getOp(), (Object)op);
        switch (optMethod) {
            case IsZero: {
                if (cn.getOpTimestamp() != 0L) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp(): " + cn.getOpTimestamp() + "!=0");
                }
                Assert.assertEquals((long)cn.getOpTimestamp(), (long)0L);
                break;
            }
            case LargerThanGivenValue: {
                if (cn.getOpTimestamp() <= opTime) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should > opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() > opTime ? 1 : 0));
                break;
            }
            case LargerEqualGivenValue: {
                if (cn.getOpTimestamp() < opTime) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should >= opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() >= opTime ? 1 : 0));
                break;
            }
            case EqualGivenValue: {
                if (cn.getOpTimestamp() != opTime) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should == opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() == opTime ? 1 : 0));
                break;
            }
            case LessThanGivenValue: {
                if (cn.getOpTimestamp() >= opTime) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should < opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() < opTime ? 1 : 0));
                break;
            }
            case LessEqualGivenValue: {
                if (cn.getOpTimestamp() > opTime) {
                    System.out.println("TimeCompareFailed: cn.getOpTimestamp():" + cn.getOpTimestamp() + " should <= opTime " + opTime);
                }
                Assert.assertEquals((Object)true, (Object)(cn.getOpTimestamp() <= opTime ? 1 : 0));
            }
        }
        if (cn.inMap()) {
            Assert.assertEquals((Object)cn.getFieldName(), (Object)fieldName);
        }
    }

    public static void assertNodeValueEquals(ChangeNodeImpl cnode, Value value, boolean isDebug) {
        Assert.assertEquals((Object)true, (Object)TestCDPSUtil.valueEquals(value, cnode.getValue(), isDebug));
    }

    public static void assertNodeValueEqualsBinary(ChangeNodeImpl cnode, byte[] value, boolean isDebug) {
        if (value == null) {
            Assert.assertNull((Object)cnode.getValue());
        } else {
            System.out.println("   value:" + Arrays.toString(value));
            System.out.println("cn.value:" + Arrays.toString(cnode.getValue().getBinary().array()));
            Assert.assertArrayEquals((byte[])value, (byte[])cnode.getValue().getBinary().array());
        }
    }

    public static Table replaceScfJsonTable(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(tableName);
    }

    public static Table replaceMcfJsonTable(AdminImpl testAdmin, String tableName) throws IOException {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        Path TABLE_1 = new Path(tableName);
        TableDescriptor TABLE_DESC_1 = MapRDBImpl.newTableDescriptor((Path)TABLE_1);
        FamilyDescriptor familyDesc = MapRDBImpl.newDefaultFamilyDescriptor().setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf2").setJsonFieldPath(P_MAPB).setCompression(FamilyDescriptor.Compression.ZLIB);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf3").setJsonFieldPath(P_MAPBC).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        familyDesc = MapRDBImpl.newFamilyDescriptor().setName("cf5").setJsonFieldPath(P_MAPE).setCompression(FamilyDescriptor.Compression.None);
        TABLE_DESC_1.addFamily(familyDesc);
        if (testAdmin == null) {
            testAdmin = (AdminImpl)MapRDBImpl.newAdmin();
        }
        return testAdmin.createTable(TABLE_DESC_1);
    }

    public static Table replaceJsonTableWithCFMap(AdminImpl testAdmin, String tablePath, Map<String, String> cfPath) throws IOException {
        if (MapRDBImpl.tableExists((String)tablePath)) {
            MapRDBImpl.deleteTable((String)tablePath);
        }
        TableDescriptor desc = MapRDBImpl.newTableDescriptor((String)tablePath);
        int familyCount = 0;
        for (Map.Entry<String, String> cf : cfPath.entrySet()) {
            if (familyCount++ == 0 && !cf.getKey().equals("default")) {
                desc.addFamily(MapRDBImpl.newDefaultFamilyDescriptor());
                ++familyCount;
            }
            desc.addFamily(MapRDBImpl.newFamilyDescriptor((String)cf.getKey(), (String)cf.getValue()));
        }
        return testAdmin.createTable(desc);
    }

    public static void replaceStreamTable(String tableName) throws Exception {
        TestCDPSUtil.replaceStreamTable(tableName, true, 1);
    }

    public static void replaceStreamTable(String tableName, boolean isChangelog, int partitionNum) throws Exception {
        if (MapRDBImpl.tableExists((String)tableName)) {
            MapRDBImpl.deleteTable((String)tableName);
        }
        String cmd = "maprcli stream create -path " + tableName + " -ischangelog " + (isChangelog ? "true" : "false") + " -defaultpartitions " + partitionNum;
        System.out.println("\n" + cmd);
        int retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (retcode != 0) {
            throw new IOException("fail to create stream table " + tableName);
        }
    }

    public static int setupCDPSReplicaReturnCode(String srcTable, String dstTable, String dstTopicFullName) throws Exception {
        String cmd = "maprcli table changelog add -path " + srcTable + " -changelog " + dstTopicFullName;
        System.out.println(cmd);
        int retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (retcode != 0) {
            return retcode;
        }
        cmd = "maprcli stream topic list -path " + dstTable;
        System.out.println(cmd);
        retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        return retcode;
    }

    public static void setupCDPSReplica(String srcTable, String dstTable, String dstTopic) throws Exception {
        int retcode = TestCDPSUtil.setupCDPSReplicaReturnCode(srcTable, dstTable, dstTopic);
        if (retcode != 0) {
            throw new IOException("fail to create dst topic " + dstTopic);
        }
    }

    public static void setupCDPSReplicaWithColumns(String srcTable, String dstTable, String dstTopicFullName, Boolean paused, String columns) throws Exception {
        String cmd = "maprcli table changelog add -path " + srcTable + " -changelog " + dstTopicFullName + " -paused " + paused.toString();
        if (columns != null) {
            cmd = cmd + " -columns " + columns;
        }
        System.out.println(cmd);
        int retcode = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (retcode != 0) {
            throw new IOException("fail to setup changelog replication for " + dstTopicFullName);
        }
    }

    public static void printChangeRecHeader(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        System.out.println("---" + (cdr.isJson() ? "Json" : "Binary") + " key(" + (Value.Type.STRING == cdr.getId().getType() ? "STRING" : "BINARY") + ":" + (Value.Type.STRING == cdr.getId().getType() ? cdr.getId().getString() : BinaryString.toStringBinary((byte[])cdr.getId().getBinary().array())) + ")  type(" + cdr.getType().name() + ") optime(" + cdr.getOpTimestamp() + " " + new OTimestamp(cdr.getOpTimestamp()) + ")");
    }

    public static void printChangeDataThroughIter(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        System.out.println(" ------itr prints-----");
        Iterator cdrItr = cdr.iterator();
        int count = 0;
        while (cdrItr.hasNext()) {
            Map.Entry cdEntry = (Map.Entry)cdrItr.next();
            String fpName = ((FieldPath)cdEntry.getKey()).asPathString();
            ChangeNodeImpl cd = (ChangeNodeImpl)cdEntry.getValue();
            System.out.println("node" + count + ": field:" + fpName + ", value:" + cd.toString());
            ++count;
        }
    }

    public static void printChangeDataThroughReader(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        System.out.println(" ------reader prints-----");
        ChangeDataReaderImpl reader = (ChangeDataReaderImpl)cdr.getReader();
        ChangeNodeImpl cd = null;
        int count = 0;
        while (reader.next() != null) {
            cd = reader.getChangeNode();
            System.out.println("node" + count + ":" + cd.toStringWithArrayIndexTime());
            ++count;
        }
    }

    public static void printChangeDataThroughReaderBinary(ChangeDataRecord cdrec) {
        ChangeDataRecordImpl cdr = (ChangeDataRecordImpl)cdrec;
        System.out.println(" ------reader prints-----");
        ChangeDataRecordImplBinary.ChangeDataReaderImplBinary reader = (ChangeDataRecordImplBinary.ChangeDataReaderImplBinary)cdr.getReader();
        ChangeNodeImpl cd = null;
        int count = 0;
        while (reader.next() != null) {
            cd = reader.getChangeNode();
            System.out.println("node" + count + ":" + cd.toStringWithArrayIndexTime());
            ++count;
        }
    }

    public static KafkaConsumer<byte[], ChangeDataRecord> startConsumer(String dstTopic) throws Exception {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "cdcjsontest1");
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> streamNames = new ArrayList<String>();
        streamNames.add(dstTopic);
        System.out.println("subscribe to topic:" + dstTopic);
        consumer.subscribe(streamNames);
        return consumer;
    }

    public static boolean valueIsNull(Value kv) {
        boolean isSame;
        boolean bl = isSame = kv.getType() == Value.Type.NULL;
        if (!isSame) {
            System.out.println("case1: not Type.NULL" + kv);
        }
        return isSame;
    }

    public static boolean valueEquals(Value kv1, Value kv2) {
        return TestCDPSUtil.valueEquals(kv1, kv2, false);
    }

    public static boolean isNumber(Value kv1) {
        Value.Type type1 = kv1.getType();
        return type1 == Value.Type.BYTE || type1 == Value.Type.DECIMAL || type1 == Value.Type.DOUBLE || type1 == Value.Type.FLOAT || type1 == Value.Type.INT || type1 == Value.Type.LONG || type1 == Value.Type.SHORT;
    }

    public static boolean numberEquals(Value kv1, Value kv2) {
        if (TestCDPSUtil.isNumber(kv1) && TestCDPSUtil.isNumber(kv2)) {
            long pv2;
            long pv1 = kv1.getLong();
            if (pv1 != (pv2 = kv2.getLong())) {
                System.out.println("case21: different number kv1 type: " + kv1.getType() + "kv2 type:" + kv2.getType() + ", value: kv1:" + pv1 + " kv2:" + pv2);
            }
            return pv1 == pv2;
        }
        return false;
    }

    public static boolean valueEquals(Value kv1, Value kv2, boolean isDebug) {
        if (isDebug) {
            System.out.println("--- kv1:" + kv1 + " kv2:" + kv2 + " ---");
        }
        if (kv1 == null && kv2 == null) {
            return true;
        }
        if (kv1 == null) {
            return TestCDPSUtil.valueIsNull(kv2);
        }
        if (kv2 == null) {
            return TestCDPSUtil.valueIsNull(kv1);
        }
        if (kv1.getType() != kv2.getType()) {
            if (TestCDPSUtil.isNumber(kv1) && TestCDPSUtil.isNumber(kv2)) {
                return TestCDPSUtil.numberEquals(kv1, kv2);
            }
            System.out.println("case2: different Type: kv1 type: " + kv1.getType() + " kv2 type:" + kv2.getType() + " kv1:" + kv1 + " kv2:" + kv2);
            return false;
        }
        Value.Type type = kv1.getType();
        if (type == Value.Type.MAP) {
            Map map1 = kv1.getMap();
            Map map2 = kv2.getMap();
            Object child1 = null;
            Object child2 = null;
            String key1 = null;
            String key2 = null;
            TreeSet keys1 = new TreeSet(map1.keySet());
            Iterator iter1 = keys1.iterator();
            TreeSet keys2 = new TreeSet(map2.keySet());
            Iterator iter2 = keys2.iterator();
            while (iter1.hasNext() || iter2.hasNext()) {
                key1 = (String)iter1.next();
                if (!Objects.equals(key1, key2 = (String)iter2.next())) {
                    System.out.println("case3: different key: key1:" + key1 + " key2:" + key2 + " kv1:" + kv1 + " kv2:" + kv2);
                    return false;
                }
                child1 = map1.get(key1);
                child2 = map2.get(key2);
                if (child1 == null && child2 == null) continue;
                if (child1 == null) {
                    if (child2 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child2);
                    }
                    System.out.println("case4: child2 not null: child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                    return false;
                }
                if (child2 == null) {
                    if (child1 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child1);
                    }
                    System.out.println("case5: child1 not null: child1:" + child1 + " kv1:" + kv1 + " kv2:" + kv2);
                    return false;
                }
                boolean childMatched = false;
                childMatched = child1 instanceof Value && child1 instanceof Value ? TestCDPSUtil.valueEquals(child1, child2) : Objects.equals(child1, child2);
                if (childMatched) continue;
                System.out.println("case6: child mismatch: child1:" + child1 + " child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            if (iter1.hasNext()) {
                System.out.println("case7: kv1 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            if (iter2.hasNext()) {
                System.out.println("case8: kv2 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            return true;
        }
        if (type == Value.Type.ARRAY) {
            List list1 = kv1.getList();
            List list2 = kv2.getList();
            Object child1 = null;
            Object child2 = null;
            Iterator iter1 = list1.iterator();
            Iterator iter2 = list2.iterator();
            while (iter1.hasNext() || iter2.hasNext()) {
                child1 = iter1.next();
                child2 = iter2.next();
                if (child1 == null && child2 == null) continue;
                if (child1 == null) {
                    if (child2 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child2);
                    }
                    System.out.println("case9: child2 not null: child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                    return false;
                }
                if (child2 == null) {
                    if (child1 instanceof Value) {
                        return TestCDPSUtil.valueIsNull(child1);
                    }
                    System.out.println("case10: child1 not null: child1:" + child1 + " kv1:" + kv1 + " kv2:" + kv2);
                    return false;
                }
                boolean childMatched = false;
                childMatched = child1 instanceof Value && child1 instanceof Value ? TestCDPSUtil.valueEquals(child1, child2) : Objects.equals(child1, child2);
                if (childMatched) continue;
                System.out.println("case11: child mismatch: child1:" + child1 + " child2:" + child2 + " kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            if (iter1.hasNext()) {
                System.out.println("case12: kv1 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            if (iter2.hasNext()) {
                System.out.println("case13: kv2 has extra values. kv1:" + kv1 + " kv2:" + kv2);
                return false;
            }
            return true;
        }
        boolean isSame = kv1.equals(kv2);
        if (!isSame) {
            System.out.println("case14: kv1 kv2 mismatch. kv1:" + kv1 + " kv2:" + kv2);
        }
        return isSame;
    }

    public static Document copyDoc(Document srcDoc) throws IOException {
        Document dstDoc = MapRDBImpl.newDocument((String)srcDoc.asJsonString());
        return dstDoc;
    }

    public static List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeData(int recnum, KafkaConsumer<byte[], ChangeDataRecord> consumer) throws Exception {
        return TestCDPSUtil.fetchChangeDataWithBreak(recnum, consumer, 30);
    }

    public static List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak(int recnum, KafkaConsumer<byte[], ChangeDataRecord> consumer, int breaknum) throws Exception {
        ArrayList<ConsumerRecord<byte[], ChangeDataRecord>> recList;
        block4: {
            int pollNum = recnum > 100 ? recnum : 100;
            Iterator iter = null;
            recList = new ArrayList<ConsumerRecord<byte[], ChangeDataRecord>>();
            int printCount = 0;
            ConsumerRecords crecs = null;
            do {
                crecs = consumer.poll((long)pollNum);
                int fetchedNum = crecs.count();
                if (++printCount % 100 == 0) {
                    System.out.println("Info: get " + fetchedNum + " records");
                }
                if (printCount / 100 > breaknum) break block4;
                if (fetchedNum > 0) {
                    iter = crecs.iterator();
                    while (iter.hasNext()) {
                        recList.add((ConsumerRecord<byte[], ChangeDataRecord>)iter.next());
                    }
                }
                if (recList.size() == recnum) break block4;
            } while (recList.size() <= recnum);
            System.out.println("Error: Expecting " + recnum + " records, get " + crecs.count() + "records");
        }
        return recList;
    }

    public static Document fetchTableRecAndPause(String rowid, Table jsonTable, boolean pause) throws IOException, InterruptedException {
        Document readdoc = jsonTable.findById(rowid);
        System.out.println("ReadDoc:" + readdoc);
        if (pause) {
            TimeUnit.SECONDS.sleep(1L);
        }
        return readdoc;
    }

    public static enum OpTimeVerifyMethod {
        IsZero,
        LargerThanGivenValue,
        LargerEqualGivenValue,
        EqualGivenValue,
        LessThanGivenValue,
        LessEqualGivenValue;

    }
}

