/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerPauseResumeWakeupTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerPauseResumeWakeupTest.class);
    private static final String STREAM = "/jtest-" + ListenerPauseResumeWakeupTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numPartitions = 10;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(10);
        madmin.createStream(STREAM, sdesc);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testPauseResume() throws IOException {
        int i;
        String topicname = "producertest";
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), "false");
        KafkaProducer producer = new KafkaProducer(props);
        props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)3000);
        KafkaConsumer consumer = new KafkaConsumer(props);
        for (int i2 = 0; i2 < 10; ++i2) {
            String key = "key-value" + i2;
            String msg = "msg-value" + i2;
            ProducerRecord record = new ProducerRecord(STREAM + ":" + topicname, Integer.valueOf(i2), (Object)key.getBytes(), (Object)msg.getBytes());
            producer.send(record);
        }
        producer.flush();
        ArrayList<String> topics = new ArrayList<String>(1);
        topics.add(STREAM + ":" + topicname);
        consumer.subscribe(topics);
        ConsumerRecords recs = consumer.poll(1000L);
        HashSet<Integer> resultSet = new HashSet<Integer>();
        HashSet<Integer> expectedSet = new HashSet<Integer>();
        for (int i3 = 0; i3 < 10; ++i3) {
            expectedSet.add(i3);
        }
        for (ConsumerRecord rec : recs) {
            resultSet.add(rec.partition());
        }
        Assert.assertTrue((resultSet.size() == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)expectedSet.equals(resultSet));
        for (int i4 = 0; i4 < 10; ++i4) {
            String key = "key-value" + i4;
            String msg = "msg-value" + i4;
            ProducerRecord record = new ProducerRecord(STREAM + ":" + topicname, Integer.valueOf(i4), (Object)key.getBytes(), (Object)msg.getBytes());
            producer.send(record);
        }
        producer.flush();
        TopicPartition[] tps = new TopicPartition[5];
        for (i = 0; i < 5; ++i) {
            tps[i] = new TopicPartition(STREAM + ":" + topicname, i);
        }
        consumer.pause(tps);
        recs = consumer.poll(1000L);
        resultSet = new HashSet();
        expectedSet = new HashSet();
        for (i = 5; i < 10; ++i) {
            expectedSet.add(i);
        }
        for (ConsumerRecord rec : recs) {
            resultSet.add(rec.partition());
        }
        Assert.assertTrue((resultSet.size() == 5 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)expectedSet.equals(resultSet));
        consumer.resume(tps);
        recs = consumer.poll(1000L);
        resultSet = new HashSet();
        expectedSet = new HashSet();
        for (int i5 = 0; i5 < 5; ++i5) {
            expectedSet.add(i5);
        }
        for (ConsumerRecord rec : recs) {
            resultSet.add(rec.partition());
        }
        Assert.assertTrue((resultSet.size() == 5 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)expectedSet.equals(resultSet));
        producer.close();
        consumer.close();
    }

    @Test
    public void testWakeup() throws Exception {
        String topicname = "producertest";
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)3000);
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> topics = new ArrayList<String>(1);
        topics.add(STREAM + ":RANDOMTOPIC");
        consumer.subscribe(topics);
        PollForLongTime worker = new PollForLongTime(consumer);
        Thread workerThread = new Thread(worker);
        workerThread.start();
        Thread.sleep(1000L);
        consumer.wakeup();
        workerThread.join();
        Assert.assertTrue((boolean)worker.verify());
        consumer.close();
    }

    @Test
    public void testPauseResumeSameFeed() throws IOException {
        String topicname = "producertest";
        boolean numPart = true;
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), "false");
        KafkaProducer producer = new KafkaProducer(props);
        props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("fetch.min.bytes", "1");
        props.put(cdef.getMetadataMaxAge(), (Object)2000);
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> topics = new ArrayList<String>(1);
        topics.add(STREAM + ":" + topicname);
        consumer.subscribe(topics);
        TopicPartition[] tps = new TopicPartition[]{new TopicPartition(STREAM + ":" + topicname, 0)};
        for (int i = 0; i < 10; ++i) {
            int numMsgsConsumed = 0;
            consumer.pause(tps);
            String key = "key-value" + i;
            String msg = "msg-value" + i;
            ProducerRecord record = new ProducerRecord(STREAM + ":" + topicname, Integer.valueOf(0), (Object)key.getBytes(), (Object)msg.getBytes());
            producer.send(record);
            consumer.resume(tps);
            producer.flush();
            ConsumerRecords recs = consumer.poll(5000L);
            for (ConsumerRecord rec : recs) {
                String keyStr = new String((byte[])rec.key(), "UTF-8");
                Assert.assertTrue((boolean)keyStr.equals(key));
                ++numMsgsConsumed;
            }
            Assert.assertTrue((numMsgsConsumed == 1 ? 1 : 0) != 0);
            consumer.pause(tps);
        }
        producer.close();
        consumer.close();
    }

    public class PollForLongTime
    implements Runnable {
        private KafkaConsumer consumer;
        private boolean interrupted;
        private boolean correctTypeOfException;

        public PollForLongTime(KafkaConsumer c) {
            this.consumer = c;
            this.interrupted = false;
            this.correctTypeOfException = false;
        }

        @Override
        public void run() {
            block2: {
                try {
                    System.out.println("Polling...");
                    this.consumer.poll(1000000000L);
                    System.out.println("Done polling...");
                }
                catch (Exception e) {
                    System.out.println("Interrupted polling thread with wake up, " + e);
                    this.interrupted = true;
                    if (!(e instanceof WakeupException)) break block2;
                    this.correctTypeOfException = true;
                }
            }
        }

        public boolean verify() {
            return this.interrupted && this.correctTypeOfException;
        }
    }
}

