package com.mapr.streams.tests.listener;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/OffsetsGapTest.class */
public class OffsetsGapTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(OffsetsGapTest.class);
    private static final String STREAM = "/jtest-" + OffsetsGapTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numParts = 1;
    private static final int defaultNumParts = 5;
    private static final int numMsgs = 1000000;
    private static final int numTopics = 1;
    private KafkaProducer kafkaProducer;
    private KafkaConsumer kafkaConsumer;
    private String topic = "t";
    private String topicName = ":t";
    private boolean produceDone = false;

    /* loaded from: input_file:com/mapr/streams/tests/listener/OffsetsGapTest$Seeker.class */
    private class Seeker implements Runnable {
        List<TopicPartition> topicPartitionList = new ArrayList();
        Lock lock;

        public Seeker(Lock lock) {
            for (int i = 0; i < 1; i++) {
                this.topicPartitionList.add(new TopicPartition(OffsetsGapTest.STREAM + OffsetsGapTest.this.topicName, i));
            }
            this.lock = lock;
        }

        @Override // java.lang.Runnable
        public void run() {
            OffsetsGapTest.this.kafkaConsumer.assign(this.topicPartitionList);
            while (true) {
                for (TopicPartition topicPartition : this.topicPartitionList) {
                    OffsetsGapTest.this.kafkaConsumer.seekToEnd(new TopicPartition[]{topicPartition});
                    OffsetsGapTest._logger.info("endPos " + OffsetsGapTest.this.kafkaConsumer.position(topicPartition));
                }
                this.lock.lock();
                if (OffsetsGapTest.this.produceDone) {
                    break;
                } else {
                    this.lock.unlock();
                }
            }
            this.lock.unlock();
            for (TopicPartition topicPartition2 : this.topicPartitionList) {
                OffsetsGapTest.this.kafkaConsumer.seekToEnd(new TopicPartition[]{topicPartition2});
                long position = OffsetsGapTest.this.kafkaConsumer.position(topicPartition2);
                OffsetsGapTest._logger.info("endPos " + position);
                Assert.assertTrue(position == 1000000);
            }
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
    }

    @Before
    public void setupTest() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(defaultNumParts);
        madmin.createStream(STREAM, newStreamDescriptor);
        _logger.info("Set region size, completed with return code " + new ProcessBuilder("maprcli", "table", "edit", "-path", STREAM, "-regionsizemb", "256").start().waitFor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("group.id", "mygroup");
        this.kafkaConsumer = new KafkaConsumer(properties2);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testGapsOnTabletSplit() throws Exception {
        new SendMessagesToProducer(this.kafkaProducer, new ProducerMultiTest.CountCallback(numMsgs), STREAM + this.topicName, 1, numMsgs, 20000).run();
        ArrayList<TopicPartition> arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(arrayList);
        for (TopicPartition topicPartition : arrayList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{topicPartition});
            long position = this.kafkaConsumer.position(topicPartition);
            _logger.info("endPos " + position + "numMsgs " + numMsgs);
            Assert.assertTrue(position == 1000000);
        }
    }

    @Test
    public void testSeekDuringTabletSplit() throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        madmin.createTopic(STREAM, this.topic);
        Thread thread = new Thread(new Seeker(reentrantLock));
        thread.start();
        new SendMessagesToProducer(this.kafkaProducer, new ProducerMultiTest.CountCallback(numMsgs), STREAM + this.topicName, 1, numMsgs, 20000).run();
        reentrantLock.lock();
        this.produceDone = true;
        reentrantLock.unlock();
        thread.join();
    }

    @Test
    public void testPollOnTabletSplit() throws Exception {
        new SendMessagesToProducer(this.kafkaProducer, new ProducerMultiTest.CountCallback(numMsgs), STREAM + this.topicName, 1, numMsgs, 20000).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(arrayList);
        int i2 = 0;
        long j = 0;
        boolean z = false;
        while (!z) {
            Iterator it = this.kafkaConsumer.poll(100L).iterator();
            while (true) {
                if (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    i2++;
                    if (i2 == numMsgs) {
                        j = consumerRecord.offset();
                        z = true;
                        break;
                    }
                }
            }
        }
        Assert.assertTrue(j == 999999);
    }

    @Test
    public void testSeekAfterRestart() throws Exception {
        new SendMessagesToProducer(this.kafkaProducer, new ProducerMultiTest.CountCallback(100 * 1), STREAM + this.topicName, 1, 100, 20000).run();
        new ProcessBuilder("sudo", "service", "mapr-warden", "restart").start().waitFor();
        ArrayList<TopicPartition> arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(arrayList);
        for (TopicPartition topicPartition : arrayList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{topicPartition});
            long position = this.kafkaConsumer.position(topicPartition);
            _logger.info("endPos " + position + "localNumMsgs 100");
            Assert.assertTrue(position == ((long) 100));
        }
    }

    @Test
    public void testSeekAfterBucketSwitch() throws Exception {
        ProducerMultiTest.CountCallback countCallback = new ProducerMultiTest.CountCallback(10000 * 1);
        _logger.info("Set streams.saveseq.maxmsgs, completed with return code " + new ProcessBuilder("/opt/mapr/server/mrconfig", "set", "config", "streams.saveseq.maxmsgs", "8193").start().waitFor());
        new SendMessagesToProducer(this.kafkaProducer, countCallback, STREAM + this.topicName, 1, 10000, 100).run();
        _logger.info("Set streams.saveseq.maxmsgs, completed with return code " + new ProcessBuilder("/opt/mapr/server/mrconfig", "set", "config", "streams.saveseq.maxmsgs", "524288").start().waitFor());
        ArrayList<TopicPartition> arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(new TopicPartition(STREAM + this.topicName, i));
        }
        this.kafkaConsumer.assign(arrayList);
        for (TopicPartition topicPartition : arrayList) {
            this.kafkaConsumer.seekToEnd(new TopicPartition[]{topicPartition});
            long position = this.kafkaConsumer.position(topicPartition);
            _logger.info("endPos " + position + "localNumMsgs 10000");
            Assert.assertTrue(position == ((long) 10000));
        }
    }

    @Test
    public void testListTopicsAfterRestart() throws Exception {
        new SendMessagesToProducer(this.kafkaProducer, new ProducerMultiTest.CountCallback(100 * 1), STREAM + this.topicName, defaultNumParts, 100, 20000).run();
        new ProcessBuilder("sudo", "service", "mapr-warden", "restart").start().waitFor();
        Map listTopicsForStream = madmin.listTopicsForStream(STREAM);
        Assert.assertTrue(!listTopicsForStream.isEmpty());
        Iterator it = listTopicsForStream.values().iterator();
        while (it.hasNext()) {
            for (TopicFeedInfo topicFeedInfo : (List) it.next()) {
                _logger.info("statmaxseq " + topicFeedInfo.stat().getMaxSeq() + "localNumMsgs 100");
                Assert.assertTrue(topicFeedInfo.stat().getMaxSeq() == ((long) (100 - 1)));
            }
        }
    }
}
