/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests;

import com.mapr.db.exceptions.TableNotFoundException;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.MessageStore;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
import org.ojai.store.DocumentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class BasicAnalyticsTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicAnalyticsTest.class);
    private static final String PREFIX = "/jtest-" + BasicAnalyticsTest.class.getSimpleName() + "-";
    private static final String STREAM = PREFIX + "smallStream";
    private static Admin madmin;
    private static final int FEEDS = 26;
    private static List<TopicFeedMsg> allMsgsData;
    private static int totalMsgs;
    private static int totalExpectedSplits;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        String seedArg = System.getProperty("seed");
        long seed = -1L;
        if (seedArg != null) {
            seed = Long.parseLong(seedArg);
        }
        try {
            madmin.deleteStream(STREAM);
        }
        catch (TableNotFoundException e) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(26);
        madmin.createStream(STREAM, sdesc);
        char c = 'A';
        int numFeeds = 1;
        totalExpectedSplits = 0;
        do {
            madmin.createTopic(STREAM, "topic" + c, numFeeds);
            totalExpectedSplits += numFeeds;
            c = (char)(c + '\u0001');
            ++numFeeds;
        } while (c <= 90);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("streams.parallel.flushers.per.partition", (Object)false);
        KafkaProducer producer = new KafkaProducer(props);
        ProducerCallback cb = new ProducerCallback();
        allMsgsData = new ArrayList<TopicFeedMsg>();
        Random rng = null;
        rng = seed != -1L ? new Random(seed) : new Random();
        totalMsgs = 0;
        for (int i = 0; i < 50; ++i) {
            int topicId = rng.nextInt(26);
            int feed = topicId == 0 ? 0 : rng.nextInt(topicId);
            int numMsgs = rng.nextInt(50);
            for (int j = 0; j < numMsgs; ++j) {
                char ch = (char)(65 + topicId);
                String topicName = STREAM + ":topic" + ch;
                byte[] key = new byte[10];
                byte[] val = new byte[10];
                ProducerRecord record = new ProducerRecord(topicName, Integer.valueOf(feed), (Object)key, (Object)val);
                producer.send(record, (Callback)cb);
            }
            producer.flush();
            allMsgsData.add(new TopicFeedMsg(topicId, feed, numMsgs));
            totalMsgs += numMsgs;
        }
        producer.close();
    }

    private int countMessages(DocumentStream rs) throws Exception {
        Iterator iter = rs.iterator();
        int count = 0;
        while (iter.hasNext()) {
            ++count;
            iter.next();
        }
        return count;
    }

    private int countMessages(DocumentStore store) throws Exception {
        DocumentStream rs = store.find();
        return this.countMessages(rs);
    }

    @Test
    public void testAllMsgs() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        Assert.assertEquals((long)totalMsgs, (long)this.countMessages(store));
        Assert.assertEquals((long)totalExpectedSplits, (long)((MessageStore)store).getNumSplits());
    }

    private int getNumExpectedMsgs(int topicId) {
        int expectedMsgs = 0;
        for (TopicFeedMsg tfm : allMsgsData) {
            if (tfm.topicId != topicId) continue;
            expectedMsgs += tfm.numMsgs;
        }
        return expectedMsgs;
    }

    @Test
    public void testSingleTopicMsgs() throws Exception {
        Random rng = new Random();
        for (int i = 0; i < 10; ++i) {
            int topicId = rng.nextInt(26);
            char ch = (char)(65 + topicId);
            String topicName = "topic" + ch;
            DocumentStore store = Streams.getMessageStore((String)STREAM, (String[])new String[]{topicName});
            Assert.assertEquals((long)this.getNumExpectedMsgs(topicId), (long)this.countMessages(store));
            Assert.assertEquals((long)(topicId + 1), (long)((MessageStore)store).getNumSplits());
        }
    }

    @Test
    public void testMultiTopicMsgs() throws Exception {
        Random rng = new Random();
        HashSet<String> topics = new HashSet<String>();
        int expectedMsgs = 0;
        int expectedSplits = 0;
        for (int i = 0; i < 15; ++i) {
            int topicId = rng.nextInt(26);
            char ch = (char)(65 + topicId);
            if (!topics.add(new String("topic" + ch))) continue;
            expectedMsgs += this.getNumExpectedMsgs(topicId);
            expectedSplits += topicId + 1;
        }
        String[] topicArr = topics.toArray(new String[topics.size()]);
        DocumentStore store = Streams.getMessageStore((String)STREAM, (String[])topicArr);
        Assert.assertEquals((long)expectedMsgs, (long)this.countMessages(store));
        Assert.assertEquals((long)expectedSplits, (long)((MessageStore)store).getNumSplits());
    }

    @Test
    public void testRegexTopicMsgs() throws Exception {
        Random rng = new Random();
        HashSet<String> topics = new HashSet<String>();
        int expectedMsgs = 0;
        int expectedSplits = 0;
        for (int i = 0; i < 15; ++i) {
            int topicId = rng.nextInt(26);
            char ch = (char)(65 + topicId);
            if (!topics.add(new String("topic" + ch))) continue;
            expectedMsgs += this.getNumExpectedMsgs(topicId);
            expectedSplits += topicId + 1;
        }
        String regex = "(";
        int i = 0;
        for (String topic : topics) {
            if (i > 0) {
                regex = regex + "|";
            }
            regex = regex + topic;
            ++i;
        }
        regex = regex + ")";
        DocumentStore store = Streams.getMessageStore((String)STREAM, (Pattern)Pattern.compile(regex));
        Assert.assertEquals((long)expectedMsgs, (long)this.countMessages(store));
        Assert.assertEquals((long)expectedSplits, (long)((MessageStore)store).getNumSplits());
    }

    @Test
    public void testNonExistantTopic() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM, (String[])new String[]{"topicNonExistant"});
        Assert.assertEquals((long)0L, (long)this.countMessages(store));
    }

    @Test
    public void testNullTopic() throws Exception {
        boolean exceptionCaught = false;
        try {
            DocumentStore store = Streams.getMessageStore((String)STREAM, (String[])new String[]{null});
        }
        catch (IllegalArgumentException e) {
            exceptionCaught = true;
        }
        Assert.assertTrue((boolean)exceptionCaught);
    }

    @Test
    public void testProjectionTopicKey() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        DocumentStream rs = store.find(new String[]{"topic", "key"});
        for (Document doc : rs) {
            Assert.assertNotEquals(null, (Object)doc.getBinary("key"));
            Assert.assertEquals(null, (Object)doc.getBinary("value"));
            boolean exceptionCaught = false;
            try {
                int feed = doc.getInt("partition");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            exceptionCaught = false;
            try {
                long offset = doc.getLong("offset");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertEquals(null, (Object)doc.getString("producer"));
            Assert.assertNotEquals(null, (Object)doc.getString("topic"));
        }
    }

    @Test
    public void testProjectionTopicPartition() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        DocumentStream rs = store.find(new String[]{"topic", "partition"});
        Iterator iter = rs.iterator();
        int count = 0;
        while (iter.hasNext()) {
            Document doc = (Document)iter.next();
            Assert.assertEquals(null, (Object)doc.getBinary("key"));
            Assert.assertEquals(null, (Object)doc.getBinary("value"));
            boolean exceptionCaught = false;
            try {
                int feed = doc.getInt("partition");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertFalse((boolean)exceptionCaught);
            exceptionCaught = false;
            try {
                long offset = doc.getLong("offset");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertEquals(null, (Object)doc.getString("producer"));
            Assert.assertNotEquals(null, (Object)doc.getString("topic"));
            ++count;
        }
        Assert.assertEquals((long)totalMsgs, (long)count);
    }

    @Test
    public void testProjectionNull() throws Exception {
        boolean exceptionCaught = false;
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        try {
            DocumentStream rs = store.find(new FieldPath[]{null});
        }
        catch (IllegalArgumentException e) {
            exceptionCaught = true;
        }
        Assert.assertTrue((boolean)exceptionCaught);
    }

    @Test
    public void testProjectionKeyValueProducer() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        DocumentStream rs = store.find(new String[]{"producer", "key", "value"});
        for (Document doc : rs) {
            Assert.assertNotEquals(null, (Object)doc.getBinary("key"));
            Assert.assertNotEquals(null, (Object)doc.getBinary("value"));
            boolean exceptionCaught = false;
            try {
                int feed = doc.getInt("partition");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            exceptionCaught = false;
            try {
                long offset = doc.getLong("offset");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertNotEquals(null, (Object)doc.getString("producer"));
            Assert.assertEquals(null, (Object)doc.getString("topic"));
        }
    }

    @Test
    public void testProjectionProducer() throws Exception {
        DocumentStore store = Streams.getMessageStore((String)STREAM);
        DocumentStream rs = store.find(new String[]{"producer"});
        Iterator iter = rs.iterator();
        int count = 0;
        while (iter.hasNext()) {
            Document doc = (Document)iter.next();
            Assert.assertEquals(null, (Object)doc.getBinary("key"));
            Assert.assertEquals(null, (Object)doc.getBinary("value"));
            boolean exceptionCaught = false;
            try {
                int feed = doc.getInt("partition");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            exceptionCaught = false;
            try {
                long offset = doc.getLong("offset");
            }
            catch (NoSuchElementException e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertNotEquals(null, (Object)doc.getString("producer"));
            Assert.assertEquals(null, (Object)doc.getString("topic"));
            ++count;
        }
        Assert.assertEquals((long)totalMsgs, (long)count);
    }

    private static final class ProducerCallback
    implements Callback {
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                exception.printStackTrace();
                System.exit(1);
            }
        }
    }

    static class TopicFeedMsg {
        public int topicId;
        public int feed;
        public int numMsgs;

        TopicFeedMsg(int t, int f, int m) {
            this.topicId = t;
            this.feed = f;
            this.numMsgs = m;
        }
    }
}

