/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class OffsetTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(OffsetTest.class);
    private static final String STREAM = "/jtest-" + OffsetTest.class.getSimpleName();
    private static MarlinAdmin madmin;
    private static KafkaProducer producer;
    private static KafkaConsumer consumer;
    private static final int numParts = 1;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = new MarlinAdmin(conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception e) {
            // empty catch block
        }
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(STREAM, sdesc);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("streams.parallel.flushers.per.partition", (Object)false);
        producer = new KafkaProducer(props);
        props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", (Object)false);
        consumer = new KafkaConsumer(props);
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        producer.close();
        consumer.close();
        madmin.deleteStream(STREAM);
    }

    public List<Long> produceMsgs(String topicName, int msgSz, int nMsgs) throws Exception {
        ProducerRecord rec;
        StringBuffer outputBuffer = new StringBuffer(msgSz);
        for (int i = 0; i < msgSz; ++i) {
            outputBuffer.append("R");
        }
        String msgValue = outputBuffer.toString();
        ArrayList<Future> futureList = new ArrayList<Future>();
        for (int i = 0; i < nMsgs; ++i) {
            rec = new ProducerRecord(topicName, (Object)Integer.toString(i), (Object)msgValue);
            Future future = producer.send(rec);
            futureList.add(future);
        }
        producer.flush();
        rec = new ProducerRecord(topicName, (Object)"", (Object)"terminator");
        producer.send(rec);
        producer.flush();
        ArrayList<Long> offsetsList = new ArrayList<Long>(nMsgs);
        int idx = 0;
        for (Future future : futureList) {
            long offset = ((RecordMetadata)future.get()).offset();
            offsetsList.add(offset);
            ++idx;
        }
        _logger.info("produced " + nMsgs + " msgs, each of sz " + msgSz);
        return offsetsList;
    }

    public int consumeMsgs(String topicName, long offset) throws Exception {
        TopicPartition p0 = new TopicPartition(topicName, 0);
        ArrayList<TopicPartition> plist = new ArrayList<TopicPartition>();
        plist.add(p0);
        consumer.assign(plist);
        consumer.seek(p0, offset);
        boolean terminated = false;
        int numMsgs = 0;
        block0: while (!terminated) {
            ConsumerRecords consumerRecs = consumer.poll(10L);
            for (ConsumerRecord record : consumerRecs) {
                if (((String)record.value()).equals("terminator")) {
                    terminated = true;
                    continue block0;
                }
                ++numMsgs;
            }
        }
        consumer.unsubscribe();
        return numMsgs;
    }

    @Test
    public void testProducerOffsetsSmallMsgs() throws Exception {
        String topicName = STREAM + ":psmall";
        int numProduced = 2000;
        List<Long> offsets = this.produceMsgs(topicName, 1, numProduced);
        for (int i = 1; i < numProduced; ++i) {
            Assert.assertTrue((offsets.get(i) > offsets.get(i - 1) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testProducerOffsetsLargeMsgs() throws Exception {
        String topicName = STREAM + ":plarge";
        int numProduced = 1000;
        List<Long> offsets = this.produceMsgs(topicName, 1024, numProduced);
        for (int i = 1; i < numProduced; ++i) {
            Assert.assertTrue((offsets.get(i) > offsets.get(i - 1) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSeeksWithSmallMsgs() throws Exception {
        String topicName = STREAM + ":small";
        int numProduced = 200;
        List<Long> offsets = this.produceMsgs(topicName, 1, numProduced);
        int numConsumed = this.consumeMsgs(topicName, 0L);
        _logger.info("consumed " + numConsumed + " msgs starting from offset 0");
        if (numConsumed != numProduced) {
            _logger.error("consumed " + numConsumed + " msgs starting from offset 0, expected " + numProduced);
            Assert.assertTrue((boolean)false);
        }
        for (int i = 0; i < numProduced; ++i) {
            numConsumed = this.consumeMsgs(topicName, offsets.get(i));
            if (numConsumed == numProduced - i) continue;
            _logger.error("consumed " + numConsumed + " msgs starting from offset " + offsets.get(i) + ", expected " + (numProduced - i));
            Assert.assertTrue((boolean)false);
        }
    }

    @Test
    public void testSeeksWithLargeMsgs() throws Exception {
        String topicName = STREAM + ":large";
        int numProduced = 100;
        List<Long> offsets = this.produceMsgs(topicName, 1000, numProduced);
        int numConsumed = this.consumeMsgs(topicName, 0L);
        if (numConsumed != numProduced) {
            _logger.error("consumed " + numConsumed + " msgs starting from offset 0, expected " + numProduced);
            Assert.assertTrue((boolean)false);
        }
        for (int i = 0; i < numProduced; ++i) {
            numConsumed = this.consumeMsgs(topicName, offsets.get(i));
            if (numConsumed == numProduced - i) continue;
            _logger.error("consumed " + numConsumed + " msgs starting from offset " + offsets.get(i) + ", expected " + (numProduced - i));
            Assert.assertTrue((boolean)false);
        }
    }
}

