/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class OffsetsGapTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(OffsetsGapTest.class);
    private static final String STREAM = "/jtest-" + OffsetsGapTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numParts = 1;
    private static final int defaultNumParts = 5;
    private static final int numMsgs = 1000000;
    private static final int numTopics = 1;
    private String topic = "t";
    private String topicName = ":t";
    private KafkaProducer kafkaProducer;
    private KafkaConsumer kafkaConsumer;
    private boolean produceDone = false;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Before
    public void setupTest() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(5);
        madmin.createStream(STREAM, sdesc);
        ProcessBuilder proc = new ProcessBuilder("maprcli", "table", "edit", "-path", STREAM, "-regionsizemb", "256");
        Process process = proc.start();
        int errorCode = process.waitFor();
        _logger.info("Set region size, completed with return code " + errorCode);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.kafkaProducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("group.id", "mygroup");
        this.kafkaConsumer = new KafkaConsumer(listenerProps);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testGapsOnTabletSplit() throws Exception {
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(1000000);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, 1, 1000000, 20000);
        producer.run();
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (int i = 0; i < 1; ++i) {
            topicPartitionList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(topicPartitionList);
        for (TopicPartition tp : topicPartitionList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
            long endPos = this.kafkaConsumer.position(tp);
            _logger.info("endPos " + endPos + "numMsgs 1000000");
            Assert.assertTrue((endPos == 1000000L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSeekDuringTabletSplit() throws Exception {
        ReentrantLock lock = new ReentrantLock();
        madmin.createTopic(STREAM, this.topic);
        Thread seeker = new Thread(new Seeker(lock));
        seeker.start();
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(1000000);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, 1, 1000000, 20000);
        producer.run();
        lock.lock();
        this.produceDone = true;
        lock.unlock();
        seeker.join();
    }

    @Test
    public void testPollOnTabletSplit() throws Exception {
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(1000000);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, 1, 1000000, 20000);
        producer.run();
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (int i = 0; i < 1; ++i) {
            topicPartitionList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(topicPartitionList);
        int numRecs = 0;
        long lastOffset = 0L;
        boolean outerBreak = false;
        block1: while (!outerBreak) {
            ConsumerRecords recs = this.kafkaConsumer.poll(100L);
            for (ConsumerRecord rec : recs) {
                if (++numRecs != 1000000) continue;
                lastOffset = rec.offset();
                outerBreak = true;
                continue block1;
            }
        }
        Assert.assertTrue((lastOffset == 999999L ? 1 : 0) != 0);
    }

    @Test
    public void testSeekAfterRestart() throws Exception {
        int localNumMsgs = 100;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(localNumMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, 1, localNumMsgs, 20000);
        producer.run();
        ProcessBuilder proc = new ProcessBuilder("sudo", "service", "mapr-warden", "restart");
        Process process = proc.start();
        int errorCode = process.waitFor();
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (int i = 0; i < 1; ++i) {
            topicPartitionList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(topicPartitionList);
        for (TopicPartition tp : topicPartitionList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
            long endPos = this.kafkaConsumer.position(tp);
            _logger.info("endPos " + endPos + "localNumMsgs " + localNumMsgs);
            Assert.assertTrue((endPos == (long)localNumMsgs ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSeekAfterBucketSwitch() throws Exception {
        int localNumMsgs = 10000;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(localNumMsgs * 1);
        ProcessBuilder proc = new ProcessBuilder("/opt/mapr/server/mrconfig", "set", "config", "streams.saveseq.maxmsgs", "8193");
        Process process = proc.start();
        int errorCode = process.waitFor();
        _logger.info("Set streams.saveseq.maxmsgs, completed with return code " + errorCode);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, 1, localNumMsgs, 100);
        producer.run();
        proc = new ProcessBuilder("/opt/mapr/server/mrconfig", "set", "config", "streams.saveseq.maxmsgs", "524288");
        process = proc.start();
        errorCode = process.waitFor();
        _logger.info("Set streams.saveseq.maxmsgs, completed with return code " + errorCode);
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (int i = 0; i < 1; ++i) {
            topicPartitionList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(topicPartitionList);
        for (TopicPartition tp : topicPartitionList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
            long endPos = this.kafkaConsumer.position(tp);
            _logger.info("endPos " + endPos + "localNumMsgs " + localNumMsgs);
            Assert.assertTrue((endPos == (long)localNumMsgs ? 1 : 0) != 0);
        }
    }

    @Test
    public void testListTopicsAfterRestart() throws Exception {
        int localNumMsgs = 100;
        int localNumParts = 5;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(localNumMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(this.kafkaProducer, cb, STREAM + this.topicName, localNumParts, localNumMsgs, 20000);
        producer.run();
        ProcessBuilder proc = new ProcessBuilder("sudo", "service", "mapr-warden", "restart");
        Process process = proc.start();
        int errorCode = process.waitFor();
        MarlinAdminImpl admin = (MarlinAdminImpl)madmin;
        Map tfMap = admin.listTopicsForStream(STREAM);
        Assert.assertTrue((!tfMap.isEmpty() ? 1 : 0) != 0);
        for (List feedInfoList : tfMap.values()) {
            for (TopicFeedInfo fi : feedInfoList) {
                _logger.info("statmaxseq " + fi.stat().getMaxSeq() + "localNumMsgs " + localNumMsgs);
                Assert.assertTrue((fi.stat().getMaxSeq() == (long)(localNumMsgs - 1) ? 1 : 0) != 0);
            }
        }
    }

    private class Seeker
    implements Runnable {
        List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        Lock lock;

        public Seeker(Lock l) {
            for (int i = 0; i < 1; ++i) {
                this.topicPartitionList.add(new TopicPartition(STREAM + OffsetsGapTest.this.topicName, i));
            }
            this.lock = l;
        }

        @Override
        public void run() {
            OffsetsGapTest.this.kafkaConsumer.assign(this.topicPartitionList);
            while (true) {
                long endPos;
                for (TopicPartition tp : this.topicPartitionList) {
                    OffsetsGapTest.this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
                    endPos = OffsetsGapTest.this.kafkaConsumer.position(tp);
                    _logger.info("endPos " + endPos);
                }
                this.lock.lock();
                if (OffsetsGapTest.this.produceDone) {
                    this.lock.unlock();
                    for (TopicPartition tp : this.topicPartitionList) {
                        OffsetsGapTest.this.kafkaConsumer.seekToEnd(new TopicPartition[]{tp});
                        endPos = OffsetsGapTest.this.kafkaConsumer.position(tp);
                        _logger.info("endPos " + endPos);
                        Assert.assertTrue((endPos == 1000000L ? 1 : 0) != 0);
                    }
                    return;
                }
                this.lock.unlock();
            }
        }
    }
}

