package org.apache.drill.exec.store.kafka;

import java.io.IOException;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaRecordReader.class */
public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
    private final ReadOptions readOptions;
    private final KafkaStoragePlugin plugin;
    private final KafkaPartitionScanSpec subScanSpec;
    private final int maxRecords;
    private MessageReader messageReader;
    private long currentOffset;
    private MessageIterator msgItr;

    public KafkaRecordReader(KafkaPartitionScanSpec kafkaPartitionScanSpec, OptionManager optionManager, KafkaStoragePlugin kafkaStoragePlugin, int i) {
        this.readOptions = new ReadOptions(optionManager);
        this.plugin = kafkaStoragePlugin;
        this.subScanSpec = kafkaPartitionScanSpec;
        this.maxRecords = i;
    }

    public boolean open(SchemaNegotiator schemaNegotiator) {
        schemaNegotiator.setErrorContext(new ChildErrorContext(schemaNegotiator.parentErrorContext()) { // from class: org.apache.drill.exec.store.kafka.KafkaRecordReader.1
            public void addContext(UserException.Builder builder) {
                super.addContext(builder);
                builder.addContext("topic_name", KafkaRecordReader.this.subScanSpec.getTopicName());
            }
        });
        schemaNegotiator.limit(this.maxRecords);
        this.messageReader = MessageReaderFactory.getMessageReader(this.readOptions.getMessageReader());
        this.messageReader.init(schemaNegotiator, this.readOptions, this.plugin);
        this.msgItr = new MessageIterator(this.messageReader.getConsumer(this.plugin), this.subScanSpec, this.readOptions.getPollTimeOut());
        return true;
    }

    public boolean next() {
        RowSetLoader writer = this.messageReader.getResultSetLoader().writer();
        while (!writer.isFull()) {
            if (!nextLine(writer)) {
                return false;
            }
        }
        return this.messageReader.endBatch();
    }

    private boolean nextLine(RowSetLoader rowSetLoader) {
        if (this.currentOffset >= this.subScanSpec.getEndOffset() || !this.msgItr.hasNext()) {
            return false;
        }
        ConsumerRecord<byte[], byte[]> next = this.msgItr.next();
        this.currentOffset = next.offset();
        this.messageReader.readMessage(next);
        return true;
    }

    public void close() {
        logger.debug("Last offset processed for {}:{} is - {}", new Object[]{this.subScanSpec.getTopicName(), Integer.valueOf(this.subScanSpec.getPartitionId()), Long.valueOf(this.currentOffset)});
        logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", new Object[]{this.subScanSpec.getTopicName(), Integer.valueOf(this.subScanSpec.getPartitionId()), Long.valueOf(this.msgItr.getTotalFetchTime())});
        this.plugin.registerToClose(this.msgItr);
        try {
            this.messageReader.close();
        } catch (IOException e) {
            logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
        }
    }

    public String toString() {
        return new PlanStringBuilder(this).field("readOptions", this.readOptions).field("currentOffset", Long.valueOf(this.currentOffset)).toString();
    }
}
