/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import java.io.IOException;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.KafkaPartitionScanSpec;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MessageIterator;
import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordReader
implements ManagedReader<SchemaNegotiator> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
    private final ReadOptions readOptions;
    private final KafkaStoragePlugin plugin;
    private final KafkaPartitionScanSpec subScanSpec;
    private final int maxRecords;
    private MessageReader messageReader;
    private long currentOffset;
    private MessageIterator msgItr;

    public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
        this.readOptions = new ReadOptions(options);
        this.plugin = plugin;
        this.subScanSpec = subScanSpec;
        this.maxRecords = maxRecords;
    }

    public boolean open(SchemaNegotiator negotiator) {
        ChildErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()){

            public void addContext(UserException.Builder builder) {
                super.addContext(builder);
                builder.addContext("topic_name", KafkaRecordReader.this.subScanSpec.getTopicName());
            }
        };
        negotiator.setErrorContext((CustomErrorContext)errorContext);
        negotiator.limit((long)this.maxRecords);
        this.messageReader = MessageReaderFactory.getMessageReader(this.readOptions.getMessageReader());
        this.messageReader.init(negotiator, this.readOptions, this.plugin);
        this.msgItr = new MessageIterator(this.messageReader.getConsumer(this.plugin), this.subScanSpec, this.readOptions.getPollTimeOut());
        return true;
    }

    public boolean next() {
        RowSetLoader rowWriter = this.messageReader.getResultSetLoader().writer();
        while (!rowWriter.isFull()) {
            if (this.nextLine(rowWriter)) continue;
            return false;
        }
        return this.messageReader.endBatch();
    }

    private boolean nextLine(RowSetLoader rowWriter) {
        if (this.currentOffset >= this.subScanSpec.getEndOffset() || !this.msgItr.hasNext()) {
            return false;
        }
        ConsumerRecord<byte[], byte[]> consumerRecord = this.msgItr.next();
        this.currentOffset = consumerRecord.offset();
        this.messageReader.readMessage(consumerRecord);
        return true;
    }

    public void close() {
        logger.debug("Last offset processed for {}:{} is - {}", new Object[]{this.subScanSpec.getTopicName(), this.subScanSpec.getPartitionId(), this.currentOffset});
        logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", new Object[]{this.subScanSpec.getTopicName(), this.subScanSpec.getPartitionId(), this.msgItr.getTotalFetchTime()});
        this.plugin.registerToClose(this.msgItr);
        try {
            this.messageReader.close();
        }
        catch (IOException e) {
            logger.warn("Error closing Kafka message reader: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    public String toString() {
        return new PlanStringBuilder((Object)this).field("readOptions", (Object)this.readOptions).field("currentOffset", (Object)this.currentOffset).toString();
    }
}

