/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.kafka.KafkaPartitionScanSpec;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MessageIterator;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordReader
extends AbstractRecordReader {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
    public static final long DEFAULT_MESSAGES_PER_BATCH = 4000L;
    private VectorContainerWriter writer;
    private MessageReader messageReader;
    private final boolean unionEnabled;
    private final KafkaStoragePlugin plugin;
    private final KafkaPartitionScanSpec subScanSpec;
    private final long kafkaPollTimeOut;
    private long currentOffset;
    private MessageIterator msgItr;
    private final boolean enableAllTextMode;
    private final boolean readNumbersAsDouble;
    private final String kafkaMsgReader;
    private int currentMessageCount;

    public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, KafkaStoragePlugin plugin) {
        this.setColumns(projectedColumns);
        OptionManager optionManager = context.getOptions();
        this.enableAllTextMode = optionManager.getBoolean("store.kafka.all_text_mode");
        this.readNumbersAsDouble = optionManager.getBoolean("store.kafka.read_numbers_as_double");
        this.unionEnabled = optionManager.getBoolean("exec.enable_union_type");
        this.kafkaMsgReader = optionManager.getString("store.kafka.record.reader");
        this.kafkaPollTimeOut = optionManager.getLong("store.kafka.poll.timeout");
        this.plugin = plugin;
        this.subScanSpec = subScanSpec;
    }

    protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
        LinkedHashSet transformed = Sets.newLinkedHashSet();
        if (!this.isStarQuery()) {
            for (SchemaPath column : projectedColumns) {
                transformed.add(column);
            }
        } else {
            transformed.add(SchemaPath.STAR_COLUMN);
        }
        return transformed;
    }

    public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
        this.writer = new VectorContainerWriter(output, this.unionEnabled);
        this.messageReader = MessageReaderFactory.getMessageReader(this.kafkaMsgReader);
        this.messageReader.init(context.getManagedBuffer(), Lists.newArrayList((Iterable)this.getColumns()), this.writer, this.enableAllTextMode, this.readNumbersAsDouble);
        this.msgItr = new MessageIterator(this.messageReader.getConsumer(this.plugin), this.subScanSpec, this.kafkaPollTimeOut);
    }

    public int next() {
        this.writer.allocate();
        this.writer.reset();
        Stopwatch watch = Stopwatch.createStarted();
        this.currentMessageCount = 0;
        try {
            while (this.currentOffset < this.subScanSpec.getEndOffset() - 1L && this.msgItr.hasNext()) {
                ConsumerRecord<byte[], byte[]> consumerRecord = this.msgItr.next();
                this.currentOffset = consumerRecord.offset();
                this.writer.setPosition(this.currentMessageCount);
                this.messageReader.readMessage(consumerRecord);
                if ((long)(++this.currentMessageCount) < 4000L) continue;
                break;
            }
            if (this.currentMessageCount > 0) {
                this.messageReader.ensureAtLeastOneField();
            }
            this.writer.setValueCount(this.currentMessageCount);
            logger.debug("Took {} ms to process {} records.", (Object)watch.elapsed(TimeUnit.MILLISECONDS), (Object)this.currentMessageCount);
            logger.debug("Last offset consumed for {}:{} is {}", new Object[]{this.subScanSpec.getTopicName(), this.subScanSpec.getPartitionId(), this.currentOffset});
            return this.currentMessageCount;
        }
        catch (Exception e) {
            String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (this.currentMessageCount + 1);
            throw UserException.dataReadError((Throwable)e).message(msg, new Object[0]).addContext(e.getMessage()).build(logger);
        }
    }

    public void close() throws Exception {
        logger.info("Last offset processed for {}:{} is - {}", new Object[]{this.subScanSpec.getTopicName(), this.subScanSpec.getPartitionId(), this.currentOffset});
        logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", new Object[]{this.subScanSpec.getTopicName(), this.subScanSpec.getPartitionId(), this.msgItr.getTotalFetchTime()});
        this.messageReader.close();
    }

    public String toString() {
        return "KafkaRecordReader[messageReader=" + this.messageReader + ", kafkaPollTimeOut=" + this.kafkaPollTimeOut + ", currentOffset=" + this.currentOffset + ", enableAllTextMode=" + this.enableAllTextMode + ", readNumbersAsDouble=" + this.readNumbersAsDouble + ", kafkaMsgReader=" + this.kafkaMsgReader + ", currentMessageCount=" + this.currentMessageCount + "]";
    }
}

