package com.mapr.db.cdc.demo;

import com.mapr.baseutils.BinaryString;
import com.mapr.db.cdc.impl.ChangeDataReaderImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImpl;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.FieldPath;
import org.ojai.Value;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.types.OTimestamp;

/* loaded from: input_file:com/mapr/db/cdc/demo/TestChgLogConsumerJson.class */
public class TestChgLogConsumerJson {
    public static KafkaConsumer<byte[], ChangeDataRecord> consumer;
    public static List<String> streamNames = new ArrayList();
    public static String consumerConfig = null;
    public static String consumerType = null;
    public static int waitBetweenScr = 60000;

    public static void usage() {
        System.err.println("TestChgLogConsumerJson -path <topic-full-name> -consumerConfig <configfile>");
        System.exit(1);
    }

    public static void main(String[] strArr) throws IOException {
        int i = 0;
        while (i < strArr.length) {
            if (strArr[i].equals("-path")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                for (String str : strArr[i].split(",")) {
                    streamNames.add(str);
                }
                System.out.println("Starting consumer on " + strArr[i]);
            } else if (strArr[i].equals("-type")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                consumerType = strArr[i];
            } else if (strArr[i].equals("-consumerConfig")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                consumerConfig = strArr[i];
            } else {
                usage();
            }
            i++;
        }
        if (streamNames.size() == 0) {
            usage();
        }
        if (consumerType == null) {
            consumerType = new String("chglog");
        }
        System.out.println("starting a " + consumerType + " consumer");
        Properties properties = new Properties();
        if (consumerConfig != null) {
            properties.load(new FileInputStream(consumerConfig));
        }
        if (properties.getProperty("bootstrap.servers") == null) {
            properties.put("bootstrap.servers", "localhost:9092");
        }
        if (properties.getProperty("key.deserializer") == null) {
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        if (properties.getProperty("value.deserializer") == null) {
            properties.put("value.deserializer", "com.mapr.streams.ChangeDataRecordDeserializer");
        }
        consumer = new KafkaConsumer<>(properties);
        consumeTransactions();
    }

    public static void printChangeDataThroughIter(ChangeDataRecord changeDataRecord) {
        Iterator it = changeDataRecord.iterator();
        int i = 0;
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            System.out.println("node" + i + ": field:" + ((FieldPath) entry.getKey()) + ", value:" + ((ChangeNode) entry.getValue()).toString());
            i++;
        }
    }

    public static void printChangeDataThroughReader(ChangeDataRecord changeDataRecord) {
        ChangeDataReaderImpl reader = changeDataRecord.getReader();
        int i = 0;
        while (reader.next() != null) {
            System.out.println("node" + i + ":" + reader.getChangeNode().toStringWithArrayIndexTime());
            i++;
        }
    }

    public static void consumeTransactions() {
        try {
            consumer.subscribe(streamNames);
            int i = 0;
            while (true) {
                Iterator it = consumer.poll(1000L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    ChangeDataRecordImpl changeDataRecordImpl = (ChangeDataRecordImpl) consumerRecord.value();
                    System.out.println("---" + (changeDataRecordImpl.isJson() ? "Json" : "Binary") + " record" + i + "\nkey(" + (Value.Type.STRING == changeDataRecordImpl.getId().getType() ? "STRING" : "BINARY") + ":" + (Value.Type.STRING == changeDataRecordImpl.getId().getType() ? changeDataRecordImpl.getId().getString() : BinaryString.toStringBinary(changeDataRecordImpl.getId().getBinary().array())) + ")  type(" + changeDataRecordImpl.getType().name() + ") optime(" + changeDataRecordImpl.getOpTimestamp() + " " + new OTimestamp(changeDataRecordImpl.getOpTimestamp()) + ")");
                    System.out.println("--- print changenode through reader ---");
                    printChangeDataThroughReader(changeDataRecordImpl);
                    System.out.println("--- print changenode through iterator---");
                    printChangeDataThroughIter(changeDataRecordImpl);
                    i++;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Consumer failed");
        }
    }
}
