package org.apache.drill.exec.store.kafka;

import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.store.kafka.KafkaSubScan;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({KafkaStorageTest.class, SlowTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/MessageIteratorTest.class */
public class MessageIteratorTest extends KafkaTestBase {
    private KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private KafkaSubScan.KafkaSubScanSpec subScanSpec;

    @Before
    public void setUp() {
        Properties kafkaConsumerProps = storagePluginConfig.getKafkaConsumerProps();
        kafkaConsumerProps.put("key.deserializer", ByteArrayDeserializer.class);
        kafkaConsumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        kafkaConsumerProps.put("max.poll.records", "4");
        this.kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProps);
        this.subScanSpec = new KafkaSubScan.KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0L, 10L);
    }

    @After
    public void cleanUp() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }

    @Test
    public void testWhenPollTimeOutIsTooLess() {
        try {
            new MessageIterator(this.kafkaConsumer, this.subScanSpec, 1L).hasNext();
            Assert.fail("Test passed even though there are no message fetched.");
        } catch (UserException e) {
            Assert.assertEquals(UserBitShared.DrillPBError.ErrorType.DATA_READ, e.getErrorType());
            Assert.assertTrue(e.getMessage().contains("DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
        }
    }

    @Test
    public void testShouldReturnTrueAsKafkaHasMessages() {
        Assert.assertTrue("Message iterator returned false though there are messages in Kafka", new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L)).hasNext());
    }

    @Test
    public void testShouldReturnMessage1() {
        MessageIterator messageIterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L));
        messageIterator.hasNext();
        Assert.assertNotNull(messageIterator.next());
        Assert.assertNotNull(messageIterator.next());
        Assert.assertNotNull(messageIterator.next());
        Assert.assertNotNull(messageIterator.next());
        try {
            messageIterator.next();
            Assert.fail("Kafak fetched more messages than configured.");
        } catch (NoSuchElementException e) {
        }
    }

    @Test
    public void testShouldReturnMessage2() {
        MessageIterator messageIterator = new MessageIterator(this.kafkaConsumer, this.subScanSpec, TimeUnit.SECONDS.toMillis(1L));
        int i = 0;
        while (messageIterator.hasNext()) {
            Assert.assertNotNull(messageIterator.next());
            i++;
        }
        Assert.assertEquals(10L, i);
    }
}
