package org.apache.drill.exec.store.kafka;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaRecordReader.class */
public class KafkaRecordReader extends AbstractRecordReader {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
    public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
    private VectorContainerWriter writer;
    private MessageReader messageReader;
    private final boolean unionEnabled;
    private final KafkaStoragePlugin plugin;
    private final KafkaPartitionScanSpec subScanSpec;
    private final long kafkaPollTimeOut;
    private long currentOffset;
    private MessageIterator msgItr;
    private final boolean enableAllTextMode;
    private final boolean readNumbersAsDouble;
    private final String kafkaMsgReader;
    private int currentMessageCount;

    public KafkaRecordReader(KafkaPartitionScanSpec kafkaPartitionScanSpec, List<SchemaPath> list, FragmentContext fragmentContext, KafkaStoragePlugin kafkaStoragePlugin) {
        setColumns(list);
        OptionManager options = fragmentContext.getOptions();
        this.enableAllTextMode = options.getBoolean("store.kafka.all_text_mode");
        this.readNumbersAsDouble = options.getBoolean("store.kafka.read_numbers_as_double");
        this.unionEnabled = options.getBoolean("exec.enable_union_type");
        this.kafkaMsgReader = options.getString("store.kafka.record.reader");
        this.kafkaPollTimeOut = options.getLong("store.kafka.poll.timeout");
        this.plugin = kafkaStoragePlugin;
        this.subScanSpec = kafkaPartitionScanSpec;
    }

    protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> collection) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        if (isStarQuery()) {
            newLinkedHashSet.add(SchemaPath.STAR_COLUMN);
        } else {
            Iterator<SchemaPath> it = collection.iterator();
            while (it.hasNext()) {
                newLinkedHashSet.add(it.next());
            }
        }
        return newLinkedHashSet;
    }

    public void setup(OperatorContext operatorContext, OutputMutator outputMutator) throws ExecutionSetupException {
        this.writer = new VectorContainerWriter(outputMutator, this.unionEnabled);
        this.messageReader = MessageReaderFactory.getMessageReader(this.kafkaMsgReader);
        this.messageReader.init(operatorContext.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer, this.enableAllTextMode, this.readNumbersAsDouble);
        this.msgItr = new MessageIterator(this.messageReader.getConsumer(this.plugin), this.subScanSpec, this.kafkaPollTimeOut);
    }

    public int next() {
        this.writer.allocate();
        this.writer.reset();
        Stopwatch createStarted = Stopwatch.createStarted();
        this.currentMessageCount = 0;
        while (this.currentOffset < this.subScanSpec.getEndOffset() - 1 && this.msgItr.hasNext()) {
            try {
                ConsumerRecord<byte[], byte[]> next = this.msgItr.next();
                this.currentOffset = next.offset();
                this.writer.setPosition(this.currentMessageCount);
                this.messageReader.readMessage(next);
                int i = this.currentMessageCount + 1;
                this.currentMessageCount = i;
                if (i >= DEFAULT_MESSAGES_PER_BATCH) {
                    break;
                }
            } catch (Exception e) {
                throw UserException.dataReadError(e).message("Failure while reading messages from kafka. Recordreader was at record: " + (this.currentMessageCount + 1), new Object[0]).addContext(e.getMessage()).build(logger);
            }
        }
        if (this.currentMessageCount > 0) {
            this.messageReader.ensureAtLeastOneField();
        }
        this.writer.setValueCount(this.currentMessageCount);
        logger.debug("Took {} ms to process {} records.", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)), Integer.valueOf(this.currentMessageCount));
        logger.debug("Last offset consumed for {}:{} is {}", new Object[]{this.subScanSpec.getTopicName(), Integer.valueOf(this.subScanSpec.getPartitionId()), Long.valueOf(this.currentOffset)});
        return this.currentMessageCount;
    }

    public void close() throws Exception {
        logger.info("Last offset processed for {}:{} is - {}", new Object[]{this.subScanSpec.getTopicName(), Integer.valueOf(this.subScanSpec.getPartitionId()), Long.valueOf(this.currentOffset)});
        logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", new Object[]{this.subScanSpec.getTopicName(), Integer.valueOf(this.subScanSpec.getPartitionId()), Long.valueOf(this.msgItr.getTotalFetchTime())});
        this.messageReader.close();
    }

    public String toString() {
        return "KafkaRecordReader[messageReader=" + this.messageReader + ", kafkaPollTimeOut=" + this.kafkaPollTimeOut + ", currentOffset=" + this.currentOffset + ", enableAllTextMode=" + this.enableAllTextMode + ", readNumbersAsDouble=" + this.readNumbersAsDouble + ", kafkaMsgReader=" + this.kafkaMsgReader + ", currentMessageCount=" + this.currentMessageCount + "]";
    }
}
