package com.mapr.streams.tests.listener;

import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/OffsetTest.class */
public class OffsetTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(OffsetTest.class);
    private static final String STREAM = "/jtest-" + OffsetTest.class.getSimpleName();
    private static MarlinAdmin madmin;
    private static KafkaProducer producer;
    private static KafkaConsumer consumer;
    private static final int numParts = 1;

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = new MarlinAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(STREAM, mStreamDescriptor);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("streams.parallel.flushers.per.partition", false);
        producer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties2.put("enable.auto.commit", false);
        consumer = new KafkaConsumer(properties2);
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        producer.close();
        consumer.close();
        madmin.deleteStream(STREAM);
    }

    public List<Long> produceMsgs(String str, int i, int i2) throws Exception {
        StringBuffer stringBuffer = new StringBuffer(i);
        for (int i3 = 0; i3 < i; i3 += numParts) {
            stringBuffer.append("R");
        }
        String stringBuffer2 = stringBuffer.toString();
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < i2; i4 += numParts) {
            arrayList.add(producer.send(new ProducerRecord(str, Integer.toString(i4), stringBuffer2)));
        }
        producer.flush();
        producer.send(new ProducerRecord(str, "", "terminator"));
        producer.flush();
        ArrayList arrayList2 = new ArrayList(i2);
        int i5 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(Long.valueOf(((RecordMetadata) ((Future) it.next()).get()).offset()));
            i5 += numParts;
        }
        _logger.info("produced " + i2 + " msgs, each of sz " + i);
        return arrayList2;
    }

    public int consumeMsgs(String str, long j) throws Exception {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicPartition);
        consumer.assign(arrayList);
        consumer.seek(topicPartition, j);
        boolean z = false;
        int i = 0;
        while (!z) {
            Iterator it = consumer.poll(10L).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (((String) ((ConsumerRecord) it.next()).value()).equals("terminator")) {
                    z = numParts;
                    break;
                }
                i += numParts;
            }
        }
        consumer.unsubscribe();
        return i;
    }

    @Test
    public void testProducerOffsetsSmallMsgs() throws Exception {
        List<Long> produceMsgs = produceMsgs(STREAM + ":psmall", numParts, 2000);
        for (int i = numParts; i < 2000; i += numParts) {
            Assert.assertTrue(produceMsgs.get(i).longValue() > produceMsgs.get(i - numParts).longValue());
        }
    }

    @Test
    public void testProducerOffsetsLargeMsgs() throws Exception {
        List<Long> produceMsgs = produceMsgs(STREAM + ":plarge", 1024, 1000);
        for (int i = numParts; i < 1000; i += numParts) {
            Assert.assertTrue(produceMsgs.get(i).longValue() > produceMsgs.get(i - numParts).longValue());
        }
    }

    @Test
    public void testSeeksWithSmallMsgs() throws Exception {
        String str = STREAM + ":small";
        List<Long> produceMsgs = produceMsgs(str, numParts, 200);
        int consumeMsgs = consumeMsgs(str, 0L);
        _logger.info("consumed " + consumeMsgs + " msgs starting from offset 0");
        if (consumeMsgs != 200) {
            _logger.error("consumed " + consumeMsgs + " msgs starting from offset 0, expected 200");
            Assert.assertTrue(false);
        }
        for (int i = 0; i < 200; i += numParts) {
            int consumeMsgs2 = consumeMsgs(str, produceMsgs.get(i).longValue());
            if (consumeMsgs2 != 200 - i) {
                _logger.error("consumed " + consumeMsgs2 + " msgs starting from offset " + produceMsgs.get(i) + ", expected " + (200 - i));
                Assert.assertTrue(false);
            }
        }
    }

    @Test
    public void testSeeksWithLargeMsgs() throws Exception {
        String str = STREAM + ":large";
        List<Long> produceMsgs = produceMsgs(str, 1000, 100);
        int consumeMsgs = consumeMsgs(str, 0L);
        if (consumeMsgs != 100) {
            _logger.error("consumed " + consumeMsgs + " msgs starting from offset 0, expected 100");
            Assert.assertTrue(false);
        }
        for (int i = 0; i < 100; i += numParts) {
            int consumeMsgs2 = consumeMsgs(str, produceMsgs.get(i).longValue());
            if (consumeMsgs2 != 100 - i) {
                _logger.error("consumed " + consumeMsgs2 + " msgs starting from offset " + produceMsgs.get(i) + ", expected " + (100 - i));
                Assert.assertTrue(false);
            }
        }
    }
}
