package org.apache.drill.exec.store.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import kafka.common.KafkaException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/MessageIterator.class */
public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
    private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class);
    private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private Iterator<ConsumerRecord<byte[], byte[]>> recordIter;
    private final TopicPartition topicPartition;
    private long totalFetchTime = 0;
    private final long kafkaPollTimeOut;
    private final long endOffset;

    public MessageIterator(KafkaConsumer<byte[], byte[]> kafkaConsumer, KafkaPartitionScanSpec kafkaPartitionScanSpec, long j) {
        this.kafkaConsumer = kafkaConsumer;
        this.kafkaPollTimeOut = j;
        ArrayList arrayList = new ArrayList(1);
        this.topicPartition = new TopicPartition(kafkaPartitionScanSpec.getTopicName(), kafkaPartitionScanSpec.getPartitionId());
        arrayList.add(this.topicPartition);
        this.kafkaConsumer.assign(arrayList);
        logger.info("Start offset of {}:{} is - {}", new Object[]{kafkaPartitionScanSpec.getTopicName(), Integer.valueOf(kafkaPartitionScanSpec.getPartitionId()), Long.valueOf(kafkaPartitionScanSpec.getStartOffset())});
        this.kafkaConsumer.seek(this.topicPartition, kafkaPartitionScanSpec.getStartOffset());
        this.endOffset = kafkaPartitionScanSpec.getEndOffset();
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException("Does not support remove operation");
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.recordIter != null && this.recordIter.hasNext()) {
            return true;
        }
        if (this.kafkaConsumer.position(this.topicPartition) >= this.endOffset) {
            return false;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(this.kafkaPollTimeOut));
            createStarted.stop();
            if (poll.isEmpty()) {
                throw UserException.dataReadError().message("Failed to fetch messages within " + this.kafkaPollTimeOut + " milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout", new Object[0]).build(logger);
            }
            long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
            logger.debug("Total number of messages fetched : {}", Integer.valueOf(poll.count()));
            logger.debug("Time taken to fetch : {} milliseconds", Long.valueOf(elapsed));
            this.totalFetchTime += elapsed;
            this.recordIter = poll.iterator();
            return this.recordIter.hasNext();
        } catch (KafkaException e) {
            logger.error(e.getMessage(), e);
            throw UserException.dataReadError(e).message(e.getMessage(), new Object[0]).build(logger);
        }
    }

    public long getTotalFetchTime() {
        return this.totalFetchTime;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public ConsumerRecord<byte[], byte[]> next() {
        return this.recordIter.next();
    }
}
