/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.lab;

import com.mapr.baseutils.BinaryString;
import com.mapr.db.cdc.impl.ChangeDataReaderImpl;
import com.mapr.db.cdc.impl.ChangeDataRecordImpl;
import com.mapr.db.cdc.impl.ChangeNodeImpl;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.FieldPath;
import org.ojai.Value;
import org.ojai.store.cdc.ChangeDataRecord;
import org.ojai.store.cdc.ChangeDataRecordType;
import org.ojai.store.cdc.ChangeNode;
import org.ojai.store.cdc.ChangeOp;
import org.ojai.types.OTimestamp;

public class NewEmpReport {
    public static List<String> streamNames = new ArrayList<String>();
    public static KafkaConsumer<byte[], ChangeDataRecord> consumer;
    public static String consumerConfig;
    public static String consumerType;
    public static int waitBetweenScr;

    public static void usage() {
        System.err.println("NewEmpReport -path <topic-full-name> -consumerConfig <configfile>");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                String[] tmp;
                if (++i >= args.length) {
                    NewEmpReport.usage();
                }
                for (String oneName : tmp = args[i].split(",")) {
                    streamNames.add(oneName);
                }
                System.out.println("Starting consumer on " + args[i]);
                continue;
            }
            if (args[i].equals("-type")) {
                if (++i >= args.length) {
                    NewEmpReport.usage();
                }
                consumerType = args[i];
                continue;
            }
            if (args[i].equals("-consumerConfig")) {
                if (++i >= args.length) {
                    NewEmpReport.usage();
                }
                consumerConfig = args[i];
                continue;
            }
            NewEmpReport.usage();
        }
        if (streamNames.size() == 0) {
            NewEmpReport.usage();
        }
        if (consumerType == null) {
            consumerType = new String("chglog");
        }
        System.out.println("starting a " + consumerType + " consumer");
        Properties props = new Properties();
        if (consumerConfig != null) {
            props.load(new FileInputStream(consumerConfig));
        }
        if (props.getProperty("bootstrap.servers") == null) {
            props.put("bootstrap.servers", "localhost:9092");
        }
        if (props.getProperty("key.deserializer") == null) {
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        if (props.getProperty("value.deserializer") == null) {
            props.put("value.deserializer", "com.mapr.streams.ChangeDataRecordDeserializer");
        }
        consumer = new KafkaConsumer(props);
        NewEmpReport.consumeTransactions();
    }

    public static void printChangeDataThroughIter(ChangeDataRecord cdr) {
        Iterator cdrItr = cdr.iterator();
        int count = 0;
        while (cdrItr.hasNext()) {
            Map.Entry cdEntry = (Map.Entry)cdrItr.next();
            FieldPath fpName = (FieldPath)cdEntry.getKey();
            ChangeNode cd = (ChangeNode)cdEntry.getValue();
            System.out.println("node" + count + ": field:" + fpName + ", value:" + cd.toString());
            ++count;
        }
    }

    public static void printMap(Map<String, String> idToName, Map<String, String> idToCandy) {
        TreeMap<String, String> reversedMap = new TreeMap<String, String>(idToName);
        for (Map.Entry entry : reversedMap.entrySet()) {
            String empId = (String)entry.getKey();
            String name = (String)entry.getValue();
            String candy = idToCandy.get(empId);
            System.out.print(name + ":" + candy + ", ");
        }
        System.out.println();
    }

    public static void collectCandy(ChangeDataRecord cdr, Map<String, String> idToName, Map<String, String> idToCandy) {
        Iterator cdrItr = cdr.iterator();
        int count = 0;
        String empId = cdr.getId().getString();
        if (cdr.getType() == ChangeDataRecordType.RECORD_DELETE) {
            idToName.remove(empId);
            idToCandy.remove(empId);
            System.out.println("============== report================ ");
            NewEmpReport.printMap(idToName, idToCandy);
            System.out.println("===================================== ");
            return;
        }
        while (cdrItr.hasNext()) {
            Map.Entry cdEntry = (Map.Entry)cdrItr.next();
            FieldPath fpPath = (FieldPath)cdEntry.getKey();
            String fpName = fpPath.asPathString();
            ChangeNode cd = (ChangeNode)cdEntry.getValue();
            if (fpName == null || fpName.equals("")) {
                String strCandy;
                Map rec;
                if (cd.getOp() != ChangeOp.SET) {
                    // empty if block
                }
                if ((rec = cd.getValue().getMap()).containsKey("name")) {
                    String strName = (String)rec.get("name");
                    idToName.put(empId, strName);
                }
                if (rec.containsKey("favoriteCandy")) {
                    strCandy = (String)rec.get("favoriteCandy");
                    idToCandy.put(empId, strCandy);
                }
                strCandy = idToName.get(empId);
            } else if (fpName.equalsIgnoreCase("name")) {
                String newName = cd.getString();
                String oldName = idToName.get(empId);
                boolean changed = false;
                if (cd.getOp() == ChangeOp.SET) {
                    if (!(newName == null && oldName == null || newName.equalsIgnoreCase(oldName))) {
                        idToName.put(empId, newName);
                    }
                } else if (cd.getOp() == ChangeOp.DELETE) {
                    idToCandy.remove(empId);
                }
            } else if (fpName.equalsIgnoreCase("favoriteCandy")) {
                boolean changed = false;
                if (cd.getOp() == ChangeOp.SET) {
                    Value vCandy = cd.getValue();
                    if (vCandy == null || vCandy.getType() == Value.Type.NULL) {
                        idToCandy.put(empId, "NULL");
                    }
                    String newCandy = cd.getString();
                    String oldCandy = idToCandy.get(empId);
                    if (!(newCandy == null && oldCandy == null || newCandy.equalsIgnoreCase(oldCandy))) {
                        idToCandy.put(empId, newCandy);
                    }
                } else if (cd.getOp() == ChangeOp.DELETE) {
                    idToCandy.remove(empId);
                }
            }
            ++count;
        }
        System.out.println("============== report================ ");
        NewEmpReport.printMap(idToName, idToCandy);
        System.out.println("===================================== ");
    }

    public static void printChangeDataThroughReader(ChangeDataRecord cdr) {
        ChangeDataReaderImpl reader = (ChangeDataReaderImpl)cdr.getReader();
        ChangeNodeImpl cd = null;
        int count = 0;
        while (reader.next() != null) {
            cd = reader.getChangeNode();
            System.out.println("node" + count + ":" + cd.toStringWithArrayIndexTime());
            ++count;
        }
    }

    public static void consumeTransactions() {
        HashMap<String, String> idToName = new HashMap<String, String>();
        HashMap<String, String> idToCandy = new HashMap<String, String>();
        try {
            consumer.subscribe(streamNames);
            int totalnum = 0;
            block2: while (true) {
                ConsumerRecords crecs = consumer.poll(1000L);
                Iterator iter = crecs.iterator();
                while (true) {
                    if (!iter.hasNext()) continue block2;
                    ConsumerRecord crec = (ConsumerRecord)iter.next();
                    byte[] key = (byte[])crec.key();
                    ChangeDataRecordImpl value = (ChangeDataRecordImpl)crec.value();
                    System.out.println("---" + (value.isJson() ? "Json" : "Binary") + " record" + totalnum + "\nkey(" + (Value.Type.STRING == value.getId().getType() ? "STRING" : "BINARY") + ":" + (Value.Type.STRING == value.getId().getType() ? value.getId().getString() : BinaryString.toStringBinary((byte[])value.getId().getBinary().array())) + ")  type(" + value.getType().name() + ") optime(" + value.getOpTimestamp() + " " + new OTimestamp(value.getOpTimestamp()) + ")");
                    System.out.println("--- print changenode through reader ---");
                    NewEmpReport.printChangeDataThroughReader((ChangeDataRecord)value);
                    System.out.println("--- print changenode through iterator---");
                    NewEmpReport.printChangeDataThroughIter((ChangeDataRecord)value);
                    NewEmpReport.collectCandy((ChangeDataRecord)value, idToName, idToCandy);
                    ++totalnum;
                }
                break;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            System.out.println("Consumer failed");
            return;
        }
    }

    static {
        consumerConfig = null;
        consumerType = null;
        waitBetweenScr = 60000;
    }
}

