/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.demo;

import java.io.FileInputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class DemoConsumer {
    public static List<String> streamNames = new ArrayList<String>();
    public static KafkaConsumer consumer;
    public static String consumerConfig;
    public static String consumerType;
    public static int waitBetweenScr;

    public static void usage() {
        System.err.println("DemoConsumer -path <topic-full-name> -consumerConfig <configfile>");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                String[] tmp;
                if (++i >= args.length) {
                    DemoConsumer.usage();
                }
                for (String oneName : tmp = args[i].split(",")) {
                    streamNames.add(oneName);
                }
                System.out.println("Starting consumer on " + args[i]);
                continue;
            }
            if (args[i].equals("-type")) {
                if (++i >= args.length) {
                    DemoConsumer.usage();
                }
                consumerType = args[i];
                continue;
            }
            if (args[i].equals("-consumerConfig")) {
                if (++i >= args.length) {
                    DemoConsumer.usage();
                }
                consumerConfig = args[i];
                continue;
            }
            DemoConsumer.usage();
        }
        if (streamNames.size() == 0) {
            DemoConsumer.usage();
        }
        if (consumerType == null) {
            consumerType = new String("analyzer");
        }
        System.out.println("starting a " + consumerType + " consumer");
        Properties props = new Properties();
        if (consumerConfig != null) {
            props.load(new FileInputStream(consumerConfig));
        }
        if (props.getProperty("bootstrap.servers") == null) {
            props.put("bootstrap.servers", "localhost:9092");
        }
        if (props.getProperty("key.deserializer") == null) {
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        if (props.getProperty("value.deserializer") == null) {
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        consumer = new KafkaConsumer(props);
        DemoConsumer.consumeTransactions();
    }

    public static void consumeTransactions() {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss,SSS");
        int numStreams = streamNames.size();
        int currentStream = 0;
        try {
            long lastTime;
            consumer.subscribe(streamNames);
            currentStream = numStreams;
            long totalBad = 0L;
            long total = 0L;
            long currTotal = 0L;
            long currBad = 0L;
            long lastSubscribedTime = lastTime = System.currentTimeMillis();
            long lastReport = 0L;
            long lapseTime = 0L;
            long maxTime = 0L;
            while (true) {
                ConsumerRecords crecs = consumer.poll(1000L);
                Iterator iter = crecs.iterator();
                long currentTime = System.currentTimeMillis();
                while (iter.hasNext()) {
                    ConsumerRecord crec = (ConsumerRecord)iter.next();
                    byte[] key = (byte[])crec.key();
                    byte[] value = (byte[])crec.value();
                    String log = new String(value);
                    if (log.contains("Suspicious")) {
                        long currTime = System.currentTimeMillis();
                        String[] parts = log.split(":");
                        long transactionTime = Long.parseLong(parts[0]);
                        String keyStr = new String(key);
                        if (consumerType.equals("analyzer")) {
                            lapseTime = currTime - transactionTime;
                            if (lapseTime > maxTime) {
                                maxTime = lapseTime;
                            }
                            System.out.println(sdf.format(currTime) + " " + currTime + ":" + keyStr + ":" + "bad transaction @" + transactionTime + " found within:" + (currTime - transactionTime) + "ms max lapse:" + maxTime + "ms");
                        }
                        ++totalBad;
                        ++currBad;
                    }
                    ++total;
                    ++currTotal;
                }
                if (currentStream < numStreams && lastSubscribedTime + (long)waitBetweenScr < currentTime) {
                    System.out.println("Subscribing to stream " + streamNames.get(currentStream));
                    consumer.subscribe(streamNames);
                    ++currentStream;
                    lastSubscribedTime = currentTime;
                }
                if (!consumerType.equals("freq") || total <= lastReport + 5000L) continue;
                double tP = (double)totalBad * 100.0 / (double)total;
                double cP = (double)currBad * 100.0 / (double)currTotal;
                System.out.println("Total transactions processed: " + total);
                System.out.println("Total suspicious transactions flagged: " + totalBad);
                System.out.printf("Current bad transaction rate : %(,.3f\n", tP);
                currTotal = 0L;
                currBad = 0L;
                long timelapse = System.currentTimeMillis() - lastTime;
                lastTime = System.currentTimeMillis();
                lastReport = total;
                if (!(cP > tP + 0.01)) continue;
                System.out.println("Saw more than 0.01% rise in bad transactions in last " + timelapse + "ms");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            System.out.println("Consumer failed");
            return;
        }
    }

    static {
        consumerConfig = null;
        consumerType = null;
        waitBetweenScr = 60000;
    }
}

