package com.mapr.streams.tests;

import com.mapr.db.exceptions.TableNotFoundException;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.MessageStore;
import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
import org.ojai.store.DocumentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/BasicAnalyticsTest.class */
public class BasicAnalyticsTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicAnalyticsTest.class);
    private static final String PREFIX = "/jtest-" + BasicAnalyticsTest.class.getSimpleName() + "-";
    private static final String STREAM = PREFIX + "smallStream";
    private static MarlinAdmin madmin;
    private static final int FEEDS = 26;
    private static List<TopicFeedMsg> allMsgsData;
    private static int totalMsgs;
    private static int totalExpectedSplits;

    /* loaded from: input_file:com/mapr/streams/tests/BasicAnalyticsTest$ProducerCallback.class */
    private static final class ProducerCallback implements Callback {
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                exc.printStackTrace();
                System.exit(1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/mapr/streams/tests/BasicAnalyticsTest$TopicFeedMsg.class */
    public static class TopicFeedMsg {
        public int topicId;
        public int feed;
        public int numMsgs;

        TopicFeedMsg(int i, int i2, int i3) {
            this.topicId = i;
            this.feed = i2;
            this.numMsgs = i3;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = new MarlinAdmin(new Configuration());
        String property = System.getProperty("seed");
        long parseLong = property != null ? Long.parseLong(property) : -1L;
        try {
            madmin.deleteStream(STREAM);
        } catch (TableNotFoundException e) {
        }
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(FEEDS);
        madmin.createStream(STREAM, mStreamDescriptor);
        char c = 'A';
        int i = 1;
        totalExpectedSplits = 0;
        do {
            madmin.createTopic(STREAM + ":topic" + c, i);
            totalExpectedSplits += i;
            c = (char) (c + 1);
            i++;
        } while (c <= 'Z');
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("streams.parallel.flushers.per.partition", false);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        ProducerCallback producerCallback = new ProducerCallback();
        allMsgsData = new ArrayList();
        Random random = parseLong != -1 ? new Random(parseLong) : new Random();
        totalMsgs = 0;
        for (int i2 = 0; i2 < 50; i2++) {
            int nextInt = random.nextInt(FEEDS);
            int nextInt2 = nextInt == 0 ? 0 : random.nextInt(nextInt);
            int nextInt3 = random.nextInt(50);
            for (int i3 = 0; i3 < nextInt3; i3++) {
                kafkaProducer.send(new ProducerRecord(STREAM + ":topic" + ((char) (65 + nextInt)), Integer.valueOf(nextInt2), new byte[10], new byte[10]), producerCallback);
            }
            kafkaProducer.flush();
            allMsgsData.add(new TopicFeedMsg(nextInt, nextInt2, nextInt3));
            totalMsgs += nextInt3;
        }
        kafkaProducer.close();
    }

    private int countMessages(DocumentStream documentStream) throws Exception {
        Iterator it = documentStream.iterator();
        int i = 0;
        while (it.hasNext()) {
            i++;
            it.next();
        }
        return i;
    }

    private int countMessages(DocumentStore documentStore) throws Exception {
        return countMessages(documentStore.find());
    }

    @Test
    public void testAllMsgs() throws Exception {
        MessageStore messageStore = Streams.getMessageStore(STREAM);
        Assert.assertEquals(totalMsgs, countMessages((DocumentStore) messageStore));
        Assert.assertEquals(totalExpectedSplits, messageStore.getNumSplits());
    }

    private int getNumExpectedMsgs(int i) {
        int i2 = 0;
        for (TopicFeedMsg topicFeedMsg : allMsgsData) {
            if (topicFeedMsg.topicId == i) {
                i2 += topicFeedMsg.numMsgs;
            }
        }
        return i2;
    }

    @Test
    public void testSingleTopicMsgs() throws Exception {
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            MessageStore messageStore = Streams.getMessageStore(STREAM, new String[]{"topic" + ((char) (65 + random.nextInt(FEEDS)))});
            Assert.assertEquals(getNumExpectedMsgs(r0), countMessages((DocumentStore) messageStore));
            Assert.assertEquals(r0 + 1, messageStore.getNumSplits());
        }
    }

    @Test
    public void testMultiTopicMsgs() throws Exception {
        Random random = new Random();
        HashSet hashSet = new HashSet();
        int i = 0;
        int i2 = 0;
        for (int i3 = 0; i3 < 15; i3++) {
            int nextInt = random.nextInt(FEEDS);
            if (hashSet.add(new String("topic" + ((char) (65 + nextInt))))) {
                i += getNumExpectedMsgs(nextInt);
                i2 += nextInt + 1;
            }
        }
        MessageStore messageStore = Streams.getMessageStore(STREAM, (String[]) hashSet.toArray(new String[hashSet.size()]));
        Assert.assertEquals(i, countMessages((DocumentStore) messageStore));
        Assert.assertEquals(i2, messageStore.getNumSplits());
    }

    @Test
    public void testRegexTopicMsgs() throws Exception {
        Random random = new Random();
        HashSet<String> hashSet = new HashSet();
        int i = 0;
        int i2 = 0;
        for (int i3 = 0; i3 < 15; i3++) {
            int nextInt = random.nextInt(FEEDS);
            if (hashSet.add(new String("topic" + ((char) (65 + nextInt))))) {
                i += getNumExpectedMsgs(nextInt);
                i2 += nextInt + 1;
            }
        }
        String str = "(";
        int i4 = 0;
        for (String str2 : hashSet) {
            if (i4 > 0) {
                str = str + "|";
            }
            str = str + str2;
            i4++;
        }
        MessageStore messageStore = Streams.getMessageStore(STREAM, Pattern.compile(str + ")"));
        Assert.assertEquals(i, countMessages((DocumentStore) messageStore));
        Assert.assertEquals(i2, messageStore.getNumSplits());
    }

    @Test
    public void testNonExistantTopic() throws Exception {
        Assert.assertEquals(0L, countMessages(Streams.getMessageStore(STREAM, new String[]{"topicNonExistant"})));
    }

    @Test
    public void testNullTopic() throws Exception {
        boolean z = false;
        try {
            Streams.getMessageStore(STREAM, new String[]{(String) null});
        } catch (IllegalArgumentException e) {
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testProjectionTopicKey() throws Exception {
        for (Document document : Streams.getMessageStore(STREAM).find(new String[]{"topic", "key"})) {
            Assert.assertNotEquals((Object) null, document.getBinary("key"));
            Assert.assertEquals((Object) null, document.getBinary("value"));
            boolean z = false;
            try {
                document.getInt("partition");
            } catch (NoSuchElementException e) {
                z = true;
            }
            Assert.assertTrue(z);
            boolean z2 = false;
            try {
                document.getLong("offset");
            } catch (NoSuchElementException e2) {
                z2 = true;
            }
            Assert.assertTrue(z2);
            Assert.assertEquals((Object) null, document.getString("producer"));
            Assert.assertNotEquals((Object) null, document.getString("topic"));
        }
    }

    @Test
    public void testProjectionTopicPartition() throws Exception {
        int i = 0;
        for (Document document : Streams.getMessageStore(STREAM).find(new String[]{"topic", "partition"})) {
            Assert.assertEquals((Object) null, document.getBinary("key"));
            Assert.assertEquals((Object) null, document.getBinary("value"));
            boolean z = false;
            try {
                document.getInt("partition");
            } catch (NoSuchElementException e) {
                z = true;
            }
            Assert.assertFalse(z);
            boolean z2 = false;
            try {
                document.getLong("offset");
            } catch (NoSuchElementException e2) {
                z2 = true;
            }
            Assert.assertTrue(z2);
            Assert.assertEquals((Object) null, document.getString("producer"));
            Assert.assertNotEquals((Object) null, document.getString("topic"));
            i++;
        }
        Assert.assertEquals(totalMsgs, i);
    }

    @Test
    public void testProjectionNull() throws Exception {
        boolean z = false;
        try {
            Streams.getMessageStore(STREAM).find(new FieldPath[]{(FieldPath) null});
        } catch (IllegalArgumentException e) {
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testProjectionKeyValueProducer() throws Exception {
        for (Document document : Streams.getMessageStore(STREAM).find(new String[]{"producer", "key", "value"})) {
            Assert.assertNotEquals((Object) null, document.getBinary("key"));
            Assert.assertNotEquals((Object) null, document.getBinary("value"));
            boolean z = false;
            try {
                document.getInt("partition");
            } catch (NoSuchElementException e) {
                z = true;
            }
            Assert.assertTrue(z);
            boolean z2 = false;
            try {
                document.getLong("offset");
            } catch (NoSuchElementException e2) {
                z2 = true;
            }
            Assert.assertTrue(z2);
            Assert.assertNotEquals((Object) null, document.getString("producer"));
            Assert.assertEquals((Object) null, document.getString("topic"));
        }
    }

    @Test
    public void testProjectionProducer() throws Exception {
        int i = 0;
        for (Document document : Streams.getMessageStore(STREAM).find(new String[]{"producer"})) {
            Assert.assertEquals((Object) null, document.getBinary("key"));
            Assert.assertEquals((Object) null, document.getBinary("value"));
            boolean z = false;
            try {
                document.getInt("partition");
            } catch (NoSuchElementException e) {
                z = true;
            }
            Assert.assertTrue(z);
            boolean z2 = false;
            try {
                document.getLong("offset");
            } catch (NoSuchElementException e2) {
                z2 = true;
            }
            Assert.assertTrue(z2);
            Assert.assertNotEquals((Object) null, document.getString("producer"));
            Assert.assertEquals((Object) null, document.getString("topic"));
            i++;
        }
        Assert.assertEquals(totalMsgs, i);
    }
}
