package com.mapr.db.cdc.tests;

import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.tests.utils.DBTests;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.KeyValue;
import org.ojai.store.cdc.ChangeDataReader;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeEvent;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/db/cdc/tests/TestCDPSBinaryWithCluster.class */
public class TestCDPSBinaryWithCluster extends BaseTest {
    private static final String ROW = "row";
    private static final String FAM = "fam";
    private static final String COL = "col";
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSBinaryWithCluster.class);
    private static String BinTableName1 = "/tmp/bsrc1";
    private static String Chglogdst1 = "/tmp/chglogdstbin1";
    private static String TopicFullName = Chglogdst1 + ":bsrc1";
    private static final int FAMCOUNT = 3;
    private static int NumRows1 = FAMCOUNT;
    private static final int MAXVERSION = 5;
    private static int NumRows2 = MAXVERSION;
    private static int NumCols = 2;

    public static void createTable(String str) throws Exception {
        String str2 = " hbase com.mapr.fs.hbase.test.TestDeletes " + str + " init";
        System.out.println("\n" + str2);
        DBTests.ExecuteShellCmd(str2);
    }

    public static void deleteRecFromTable(String str) throws Exception {
        String str2 = " hbase com.mapr.fs.hbase.test.TestDeletes " + str + " delete";
        System.out.println("\n" + str2);
        DBTests.ExecuteShellCmd(str2);
    }

    public static void colOpOnTable(String str) throws Exception {
        String str2 = " hbase com.mapr.fs.hbase.test.TestDeletes " + str + " colop";
        System.out.println("\n" + str2);
        DBTests.ExecuteShellCmd(str2);
    }

    @BeforeClass
    public static void startupBeforeClass() throws Exception {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 3 minutes, please wait ---\nTestCDPSBinaryWithCluster#testBinaryConsumer ------ 10 minutes\n");
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        System.out.println("Done!");
    }

    public static void verifyInitPutsThroughIterWithCols1(String str, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        Iterator it = changeDataRecord.iterator();
        ArrayList arrayList = new ArrayList();
        arrayList.add(0, new boolean[]{false, false});
        arrayList.add(1, new boolean[]{false, true});
        arrayList.add(2, new boolean[]{true, true});
        for (int i = 0; i < FAMCOUNT; i++) {
            String str2 = FAM + i;
            for (int i2 = 0; i2 < NumCols; i2++) {
                String str3 = COL + i2;
                if (((boolean[]) arrayList.get(i))[i2]) {
                    TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, str2 + "." + str3, z ? (str + "-" + str2 + str3 + "vCurrentTime").getBytes() : Bytes.toBytes(1000L));
                    long j = 3;
                    while (true) {
                        long j2 = j;
                        if (j2 >= 0) {
                            TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, j2, str2 + "." + str3, z ? (str + "-" + str2 + str3 + "v" + j2).getBytes() : Bytes.toBytes(j2));
                            j = j2 - 1;
                        }
                    }
                }
            }
        }
    }

    public static void verifyInitPutsThroughIter1(String str, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        Iterator it = changeDataRecord.iterator();
        for (int i = 0; i < FAMCOUNT; i++) {
            String str2 = FAM + i;
            for (int i2 = 0; i2 < NumCols; i2++) {
                String str3 = COL + i2;
                TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, str2 + "." + str3, z ? (str + "-" + str2 + str3 + "vCurrentTime").getBytes() : Bytes.toBytes(1000L));
                long j = 3;
                while (true) {
                    long j2 = j;
                    if (j2 >= 0) {
                        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, j2, str2 + "." + str3, z ? (str + "-" + str2 + str3 + "v" + j2).getBytes() : Bytes.toBytes(j2));
                        j = j2 - 1;
                    }
                }
            }
        }
    }

    public static void verifyOneFamOpThroughReader1(String str, String str2, ChangeOp changeOp, long j, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        ChangeDataReader reader = changeDataRecord.getReader();
        long serverTimestamp = changeDataRecord.getServerTimestamp();
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, serverTimestamp, null, null);
        TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOp, j, str2, bArr);
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, serverTimestamp, null, null);
    }

    public static void verifyOneOpThroughReader1(String str, String str2, String str3, ChangeOp changeOp, long j, TestCDPSUtil.OpTimeVerifyMethod opTimeVerifyMethod, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        ChangeDataReader reader = changeDataRecord.getReader();
        long serverTimestamp = changeDataRecord.getServerTimestamp();
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, serverTimestamp, null, null);
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, serverTimestamp, str2, null);
        TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), opTimeVerifyMethod, ChangeEvent.NODE, changeOp, j, str3, bArr);
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, serverTimestamp, str2, null);
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, serverTimestamp, null, null);
    }

    public static void verifyInitPutsThroughReader1(String str, ChangeDataRecord changeDataRecord, boolean z, boolean z2) throws IOException {
        ChangeDataReader reader = changeDataRecord.getReader();
        long opTimestamp = changeDataRecord.getOpTimestamp();
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opTimestamp, null, null);
        for (int i = 0; i < FAMCOUNT; i++) {
            String str2 = FAM + i;
            TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, opTimestamp, str2, null);
            for (int i2 = 0; i2 < NumCols; i2++) {
                String str3 = COL + i2;
                TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.LargerThanGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 5L, str3, z ? (str + "-" + str2 + str3 + "vCurrentTime").getBytes() : Bytes.toBytes(1000L));
                long j = 3;
                while (true) {
                    long j2 = j;
                    if (j2 >= 0) {
                        TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, j2, str3, z ? (str + "-" + str2 + str3 + "v" + j2).getBytes() : Bytes.toBytes(j2));
                        j = j2 - 1;
                    }
                }
            }
            TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opTimestamp, str2, null);
        }
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, opTimestamp, null, null);
    }

    public static void verifyInitPuts(String str, ChangeDataRecord changeDataRecord, boolean z, boolean z2) throws IOException {
        if (z2) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.SET);
        verifyInitPutsThroughIter1(str, changeDataRecord, z);
        verifyInitPutsThroughReader1(str, changeDataRecord, z, z2);
    }

    public static void verifyInitPutWithCols(String str, ChangeDataRecord changeDataRecord, boolean z, boolean z2) throws IOException {
        if (z2) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.SET);
        verifyInitPutsThroughIterWithCols1(str, changeDataRecord, z);
    }

    public static void verifyMutations(String str, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator it = changeDataRecord.iterator();
        String str2 = COL + 1;
        for (int i = 0; i < FAMCOUNT; i++) {
            String str3 = FAM + i;
            TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE_EXACT, 3L, str3 + "." + str2, null);
            TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 6L, str3 + "." + str2, ("-mutated-value" + i + 1).getBytes());
        }
        ChangeDataReader reader = changeDataRecord.getReader();
        long serverTimestamp = changeDataRecord.getServerTimestamp();
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, serverTimestamp, null, null);
        for (int i2 = 0; i2 < FAMCOUNT; i2++) {
            String str4 = FAM + i2;
            TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.START_MAP, ChangeOp.MERGE, serverTimestamp, str4, null);
            TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.DELETE_EXACT, 3L, str2, null);
            TestCDPSUtil.assertNodeEqualsBinary(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, 6L, str2, ("-mutated-value" + i2 + 1).getBytes());
            TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, serverTimestamp, str4, null);
        }
        TestCDPSUtil.assertNodeEquals(TestCDPSUtil.moveToNextNodeReaderBinary(reader), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.END_MAP, ChangeOp.NULL, serverTimestamp, null, null);
    }

    public static void verifyOnePut(String str, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) changeDataRecord.iterator().next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, changeDataRecord.getOpTimestamp(), "fam0.col0", (str + "-fam0col0vCurrentTime2").getBytes());
        verifyOneOpThroughReader1(str, "fam0", "col0", ChangeOp.PUT, changeDataRecord.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, "row0-fam0col0vCurrentTime2".getBytes(), changeDataRecord, z);
    }

    public static void verifyOnePutWithCol(String str, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator it = changeDataRecord.iterator();
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, changeDataRecord.getOpTimestamp(), "fam1.col1", (str + "-fam1col1vCurrentTime2").getBytes());
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, changeDataRecord.getOpTimestamp(), "fam2.col0", (str + "-fam2col0vCurrentTime2").getBytes());
    }

    public static void verifyGeneralPut(String str, ChangeDataRecord changeDataRecord, String str2, String str3, byte[] bArr, long j, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) changeDataRecord.iterator().next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, ChangeOp.PUT, j, str2 + "." + str3, bArr);
        verifyOneOpThroughReader1(str, str2, str3, ChangeOp.PUT, j, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, bArr, changeDataRecord, z);
    }

    public static void verifyColOpWithCol(String str, String str2, String str3, ChangeOp changeOp, long j, TestCDPSUtil.OpTimeVerifyMethod opTimeVerifyMethod, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator it = changeDataRecord.iterator();
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), opTimeVerifyMethod, ChangeEvent.NODE, changeOp, j, "fam1.col1", bArr);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), opTimeVerifyMethod, ChangeEvent.NODE, changeOp, j, "fam2.col1", bArr);
    }

    public static void verifyColOp(String str, String str2, String str3, ChangeOp changeOp, long j, TestCDPSUtil.OpTimeVerifyMethod opTimeVerifyMethod, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        verifyOneOpThroughReader1(str, str2, str3, changeOp, j, opTimeVerifyMethod, bArr, changeDataRecord, z);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) changeDataRecord.iterator().next()).getValue(), opTimeVerifyMethod, ChangeEvent.NODE, changeOp, j, str2 + "." + str3, bArr);
    }

    public static void verifyFamOpWithCol(String str, String str2, ChangeOp changeOp, long j, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        Iterator it = changeDataRecord.iterator();
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOp, j, "fam1", bArr);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) it.next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOp, j, "fam2", bArr);
    }

    public static void verifyFamOp(String str, String str2, ChangeOp changeOp, long j, byte[] bArr, ChangeDataRecord changeDataRecord, boolean z) throws IOException {
        if (z) {
            TestCDPSUtil.printCDRec(changeDataRecord);
        }
        TestCDPSUtil.verifyHeadBinary(changeDataRecord, false, str.getBytes(), TestCDPSUtil.OpTimeVerifyMethod.LargerEqualGivenValue, 0L, ChangeOp.MERGE);
        verifyOneFamOpThroughReader1(str, str2, changeOp, j, null, changeDataRecord, z);
        TestCDPSUtil.assertNodeEqualsBinary((ChangeNode) ((KeyValue) changeDataRecord.iterator().next()).getValue(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, ChangeEvent.NODE, changeOp, j, str2, bArr);
    }

    @Test
    public void testBinaryConsumer() throws Exception {
        long nanoTime = System.nanoTime();
        createTable(BinTableName1);
        long nanoTime2 = System.nanoTime();
        TestCDPSUtil.replaceStreamTable(Chglogdst1, true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns(BinTableName1, Chglogdst1, TopicFullName, false, null);
        long nanoTime3 = System.nanoTime();
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(TopicFullName);
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak = TestCDPSUtil.fetchChangeDataWithBreak(NumRows2, startConsumer, 60);
        Assert.assertEquals(NumRows2, fetchChangeDataWithBreak.size());
        long nanoTime4 = System.nanoTime();
        for (int i = 0; i < NumRows1; i++) {
            ChangeDataRecord changeDataRecord = (ChangeDataRecord) fetchChangeDataWithBreak.get(i).value();
            String str = ROW + i;
            System.out.println("\n------ " + str + "--------\n");
            TestCDPSUtil.printChangeRecHeader(changeDataRecord);
            TestCDPSUtil.printChangeDataThroughIter(changeDataRecord);
            TestCDPSUtil.printChangeDataThroughReaderBinary(changeDataRecord);
            verifyInitPuts(str, changeDataRecord, true, false);
        }
        for (int i2 = NumRows1; i2 < NumRows2; i2++) {
            ChangeDataRecord changeDataRecord2 = (ChangeDataRecord) fetchChangeDataWithBreak.get(i2).value();
            String str2 = ROW + i2;
            System.out.println("\n------ " + str2 + "--------\n");
            TestCDPSUtil.printChangeRecHeader(changeDataRecord2);
            TestCDPSUtil.printChangeDataThroughIter(changeDataRecord2);
            TestCDPSUtil.printChangeDataThroughReaderBinary(changeDataRecord2);
            verifyInitPuts(str2, changeDataRecord2, false, false);
        }
        long nanoTime5 = System.nanoTime();
        deleteRecFromTable(BinTableName1);
        long nanoTime6 = System.nanoTime();
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak2 = TestCDPSUtil.fetchChangeDataWithBreak(12, startConsumer, 20);
        long nanoTime7 = System.nanoTime();
        verifyOnePut("row0", (ChangeDataRecord) fetchChangeDataWithBreak2.get(0).value(), false);
        ChangeDataRecord changeDataRecord3 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(1).value();
        verifyColOp("row0", "fam0", "col1", ChangeOp.DELETE_EXACT, changeDataRecord3.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.LessEqualGivenValue, null, changeDataRecord3, false);
        verifyColOp("row0", "fam0", "col0", ChangeOp.DELETE_EXACT, 1L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(2).value(), false);
        ChangeDataRecord changeDataRecord4 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(FAMCOUNT).value();
        verifyColOp("row0", "fam1", "col0", ChangeOp.DELETE, changeDataRecord4.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, changeDataRecord4, false);
        verifyColOp("row0", "fam1", "col1", ChangeOp.DELETE, 2L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(4).value(), false);
        ChangeDataRecord changeDataRecord5 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(MAXVERSION).value();
        verifyFamOp("row0", "fam1", ChangeOp.DELETE, changeDataRecord5.getOpTimestamp(), null, changeDataRecord5, false);
        verifyFamOp("row0", "fam2", ChangeOp.DELETE, 2L, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(6).value(), false);
        verifyFamOp("row0", "fam2", ChangeOp.DELETE, 3L, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(7).value(), false);
        TestCDPSUtil.verifyRowDeleteBinary("row1", (ChangeDataRecord) fetchChangeDataWithBreak2.get(8).value());
        ChangeDataRecord changeDataRecord6 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(9).value();
        verifyGeneralPut("row2", changeDataRecord6, "fam0", "col0", "row2-fam0col0vCurrentTime-append-value1".getBytes(), changeDataRecord6.getOpTimestamp(), false);
        byte[] bytes = Bytes.toBytes(1002L);
        ChangeDataRecord changeDataRecord7 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(10).value();
        verifyGeneralPut("row3", changeDataRecord7, "fam0", "col0", bytes, changeDataRecord7.getOpTimestamp(), false);
        verifyMutations("row2", (ChangeDataRecord) fetchChangeDataWithBreak2.get(11).value(), false);
        long nanoTime8 = System.nanoTime();
        System.out.println("\nTime(ns) createSetLoadSrcTable start:" + nanoTime + "\t end:" + nanoTime2 + "\t taken:" + (nanoTime2 - nanoTime2) + " \n \nTime(ns) createSetupChangelog  start:" + nanoTime2 + "\t end:" + nanoTime3 + "\t taken:" + (nanoTime3 - nanoTime2) + " \n \nTime(ns) copyRegionPhase       start:" + nanoTime3 + "\t end:" + nanoTime4 + "\t taken:" + (nanoTime4 - nanoTime3) + " \n \nTime(ns) analyzeInitData       start:" + nanoTime4 + "\t end:" + nanoTime5 + "\t taken:" + (nanoTime5 - nanoTime4) + " \n \nTime(ns) putMoreSrcData        start:" + nanoTime5 + "\t end:" + nanoTime6 + "\t taken:" + (nanoTime6 - nanoTime5) + " \n \nTime(ns) bucketReplPhase       start:" + nanoTime6 + "\t end:" + nanoTime7 + "\t taken:" + (nanoTime7 - nanoTime6) + " \n \nTime(ns) analyzeInitData       start:" + nanoTime7 + "\t end:" + nanoTime8 + "\t taken:" + (nanoTime8 - nanoTime7) + " \n ");
    }

    @Test
    public void testReplWithColumns() throws Exception {
        String str = "/tmp/chglogcoldstbin1:tcolsrc1";
        createTable("/tmp/tcolsrc1");
        TestCDPSUtil.replaceStreamTable("/tmp/chglogcoldstbin1", true, 1);
        TestCDPSUtil.setupCDPSReplicaWithColumns("/tmp/tcolsrc1", "/tmp/chglogcoldstbin1", str, false, "fam1:col1,fam2");
        KafkaConsumer<byte[], ChangeDataRecord> startConsumer = TestCDPSUtil.startConsumer(str);
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak = TestCDPSUtil.fetchChangeDataWithBreak(NumRows2, startConsumer, 60);
        Assert.assertEquals(NumRows2, fetchChangeDataWithBreak.size());
        for (int i = 0; i < NumRows1; i++) {
            ChangeDataRecord changeDataRecord = (ChangeDataRecord) fetchChangeDataWithBreak.get(i).value();
            String str2 = ROW + i;
            System.out.println("\n------ " + str2 + "--------\n");
            TestCDPSUtil.printChangeRecHeader(changeDataRecord);
            TestCDPSUtil.printChangeDataThroughIter(changeDataRecord);
            TestCDPSUtil.printChangeDataThroughReaderBinary(changeDataRecord);
            verifyInitPutWithCols(str2, changeDataRecord, true, false);
        }
        for (int i2 = NumRows1; i2 < NumRows2; i2++) {
            ChangeDataRecord changeDataRecord2 = (ChangeDataRecord) fetchChangeDataWithBreak.get(i2).value();
            String str3 = ROW + i2;
            System.out.println("\n------ " + str3 + "--------\n");
            TestCDPSUtil.printChangeRecHeader(changeDataRecord2);
            TestCDPSUtil.printChangeDataThroughIter(changeDataRecord2);
            TestCDPSUtil.printChangeDataThroughReaderBinary(changeDataRecord2);
            verifyInitPutWithCols(str3, changeDataRecord2, false, false);
        }
        colOpOnTable("/tmp/tcolsrc1");
        System.out.println("\n------ Done Col Ops, wait for the data been replicated --------\n");
        List<ConsumerRecord<byte[], ChangeDataRecord>> fetchChangeDataWithBreak2 = TestCDPSUtil.fetchChangeDataWithBreak(7, startConsumer, 20);
        Assert.assertEquals(7L, fetchChangeDataWithBreak2.size());
        verifyOnePutWithCol("row0", (ChangeDataRecord) fetchChangeDataWithBreak2.get(0).value(), false);
        ChangeDataRecord changeDataRecord3 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(1).value();
        verifyColOpWithCol("row0", null, null, ChangeOp.DELETE_EXACT, changeDataRecord3.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.LessEqualGivenValue, null, changeDataRecord3, false);
        verifyColOpWithCol("row0", null, null, ChangeOp.DELETE_EXACT, 1L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(2).value(), false);
        verifyColOpWithCol("row0", null, null, ChangeOp.DELETE, 2L, TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(FAMCOUNT).value(), false);
        ChangeDataRecord changeDataRecord4 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(4).value();
        verifyColOpWithCol("row0", null, null, ChangeOp.DELETE, changeDataRecord4.getOpTimestamp(), TestCDPSUtil.OpTimeVerifyMethod.EqualGivenValue, null, changeDataRecord4, false);
        verifyFamOpWithCol("row0", null, ChangeOp.DELETE, 3L, null, (ChangeDataRecord) fetchChangeDataWithBreak2.get(MAXVERSION).value(), false);
        ChangeDataRecord changeDataRecord5 = (ChangeDataRecord) fetchChangeDataWithBreak2.get(6).value();
        verifyFamOpWithCol("row0", null, ChangeOp.DELETE, changeDataRecord5.getOpTimestamp(), null, changeDataRecord5, false);
    }
}
