package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ListenerPauseResumeWakeupTest.class */
public class ListenerPauseResumeWakeupTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerPauseResumeWakeupTest.class);
    private static final String STREAM = "/jtest-" + ListenerPauseResumeWakeupTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numPartitions = 10;

    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerPauseResumeWakeupTest$PollForLongTime.class */
    public class PollForLongTime implements Runnable {
        private KafkaConsumer consumer;
        private boolean interrupted = false;
        private boolean correctTypeOfException = false;

        public PollForLongTime(KafkaConsumer kafkaConsumer) {
            this.consumer = kafkaConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                System.out.println("Polling...");
                this.consumer.poll(1000000000L);
                System.out.println("Done polling...");
            } catch (Exception e) {
                System.out.println("Interrupted polling thread with wake up, " + e);
                this.interrupted = true;
                if (e instanceof WakeupException) {
                    this.correctTypeOfException = true;
                }
            }
        }

        public boolean verify() {
            return this.interrupted && this.correctTypeOfException;
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numPartitions);
        madmin.createStream(STREAM, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testPauseResume() throws IOException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getParallelFlushersPerPartition(), "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put(defaultInstance.getMetadataMaxAge(), 3000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        for (int i = 0; i < numPartitions; i++) {
            kafkaProducer.send(new ProducerRecord(STREAM + ":producertest", Integer.valueOf(i), ("key-value" + i).getBytes(), ("msg-value" + i).getBytes()));
        }
        kafkaProducer.flush();
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(STREAM + ":producertest");
        kafkaConsumer.subscribe(arrayList);
        ConsumerRecords poll = kafkaConsumer.poll(1000L);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (int i2 = 0; i2 < numPartitions; i2++) {
            hashSet2.add(Integer.valueOf(i2));
        }
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            hashSet.add(Integer.valueOf(((ConsumerRecord) it.next()).partition()));
        }
        Assert.assertTrue(hashSet.size() == numPartitions);
        Assert.assertTrue(hashSet2.equals(hashSet));
        for (int i3 = 0; i3 < numPartitions; i3++) {
            kafkaProducer.send(new ProducerRecord(STREAM + ":producertest", Integer.valueOf(i3), ("key-value" + i3).getBytes(), ("msg-value" + i3).getBytes()));
        }
        kafkaProducer.flush();
        TopicPartition[] topicPartitionArr = new TopicPartition[5];
        for (int i4 = 0; i4 < 5; i4++) {
            topicPartitionArr[i4] = new TopicPartition(STREAM + ":producertest", i4);
        }
        kafkaConsumer.pause(topicPartitionArr);
        ConsumerRecords poll2 = kafkaConsumer.poll(1000L);
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        for (int i5 = 5; i5 < numPartitions; i5++) {
            hashSet4.add(Integer.valueOf(i5));
        }
        Iterator it2 = poll2.iterator();
        while (it2.hasNext()) {
            hashSet3.add(Integer.valueOf(((ConsumerRecord) it2.next()).partition()));
        }
        Assert.assertTrue(hashSet3.size() == 5);
        Assert.assertTrue(hashSet4.equals(hashSet3));
        kafkaConsumer.resume(topicPartitionArr);
        ConsumerRecords poll3 = kafkaConsumer.poll(1000L);
        HashSet hashSet5 = new HashSet();
        HashSet hashSet6 = new HashSet();
        for (int i6 = 0; i6 < 5; i6++) {
            hashSet6.add(Integer.valueOf(i6));
        }
        Iterator it3 = poll3.iterator();
        while (it3.hasNext()) {
            hashSet5.add(Integer.valueOf(((ConsumerRecord) it3.next()).partition()));
        }
        Assert.assertTrue(hashSet5.size() == 5);
        Assert.assertTrue(hashSet6.equals(hashSet5));
        kafkaProducer.close();
        kafkaConsumer.close();
    }

    @Test
    public void testWakeup() throws Exception {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(defaultInstance.getMetadataMaxAge(), 3000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(STREAM + ":RANDOMTOPIC");
        kafkaConsumer.subscribe(arrayList);
        PollForLongTime pollForLongTime = new PollForLongTime(kafkaConsumer);
        Thread thread = new Thread(pollForLongTime);
        thread.start();
        Thread.sleep(1000L);
        kafkaConsumer.wakeup();
        thread.join();
        Assert.assertTrue(pollForLongTime.verify());
        kafkaConsumer.close();
    }

    @Test
    public void testPauseResumeSameFeed() throws IOException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getParallelFlushersPerPartition(), "false");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("fetch.min.bytes", "1");
        properties2.put(defaultInstance.getMetadataMaxAge(), 2000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(STREAM + ":producertest");
        kafkaConsumer.subscribe(arrayList);
        TopicPartition[] topicPartitionArr = {new TopicPartition(STREAM + ":producertest", 0)};
        for (int i = 0; i < numPartitions; i++) {
            int i2 = 0;
            kafkaConsumer.pause(topicPartitionArr);
            String str = "key-value" + i;
            kafkaProducer.send(new ProducerRecord(STREAM + ":producertest", 0, str.getBytes(), ("msg-value" + i).getBytes()));
            kafkaConsumer.resume(topicPartitionArr);
            kafkaProducer.flush();
            Iterator it = kafkaConsumer.poll(5000L).iterator();
            while (it.hasNext()) {
                Assert.assertTrue(new String((byte[]) ((ConsumerRecord) it.next()).key(), "UTF-8").equals(str));
                i2++;
            }
            Assert.assertTrue(i2 == 1);
            kafkaConsumer.pause(topicPartitionArr);
        }
        kafkaProducer.close();
        kafkaConsumer.close();
    }
}
