/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.store.kafka.KafkaPartitionScanSpec;
import org.apache.drill.exec.store.kafka.KafkaTestBase;
import org.apache.drill.exec.store.kafka.MessageIterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={KafkaStorageTest.class, SlowTest.class})
public class MessageIteratorTest
extends KafkaTestBase {
    private KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private KafkaPartitionScanSpec subScanSpec;

    @Before
    public void setUp() {
        Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class);
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        consumerProps.put("max.poll.records", "4");
        this.kafkaConsumer = new KafkaConsumer(consumerProps);
        this.subScanSpec = new KafkaPartitionScanSpec("drill-json-topic", 0, 0L, 10L);
    }

    @After
    public void cleanUp() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }

    @Test
    public void testWhenPollTimeOutIsTooLess() {
        MessageIterator iterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, 1L);
        try {
            iterator.hasNext();
            Assert.fail((String)"Test passed even though there are no message fetched.");
        }
        catch (UserException ue) {
            Assert.assertEquals((Object)UserBitShared.DrillPBError.ErrorType.DATA_READ, (Object)ue.getErrorType());
            Assert.assertTrue((boolean)ue.getMessage().contains("DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
        }
    }

    @Test
    public void testShouldReturnTrueAsKafkaHasMessages() {
        MessageIterator iterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L));
        Assert.assertTrue((String)"Message iterator returned false though there are messages in Kafka", (boolean)iterator.hasNext());
    }

    @Test
    public void testShouldReturnMessage1() {
        MessageIterator iterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L));
        iterator.hasNext();
        Assert.assertNotNull((Object)iterator.next());
        Assert.assertNotNull((Object)iterator.next());
        Assert.assertNotNull((Object)iterator.next());
        Assert.assertNotNull((Object)iterator.next());
        try {
            iterator.next();
            Assert.fail((String)"Kafak fetched more messages than configured.");
        }
        catch (NoSuchElementException noSuchElementException) {
            // empty catch block
        }
    }

    @Test
    public void testShouldReturnMessage2() {
        MessageIterator iterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L));
        int messageCount = 0;
        while (iterator.hasNext()) {
            ConsumerRecord consumerRecord = iterator.next();
            Assert.assertNotNull((Object)consumerRecord);
            ++messageCount;
        }
        Assert.assertEquals((long)10L, (long)messageCount);
    }
}

