package org.apache.drill.exec.store.kafka.decoders;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.ColumnConverter;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.avro.AvroColumnConverterFactory;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.class */
public class AvroMessageReader implements MessageReader {
    private static final Logger logger = LoggerFactory.getLogger(AvroMessageReader.class);
    private KafkaAvroDeserializer deserializer;
    private ColumnConverter converter;
    private ResultSetLoader loader;
    private boolean deserializeKey;

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void init(SchemaNegotiator schemaNegotiator, ReadOptions readOptions, KafkaStoragePlugin kafkaStoragePlugin) {
        Properties kafkaConsumerProps = kafkaStoragePlugin.m8getConfig().getKafkaConsumerProps();
        this.deserializer = new KafkaAvroDeserializer((SchemaRegistryClient) null, (Map) kafkaConsumerProps.entrySet().stream().collect(Collectors.toMap(entry -> {
            return entry.getKey().toString();
        }, (v0) -> {
            return v0.getValue();
        })));
        TupleMetadata providedSchema = schemaNegotiator.providedSchema();
        this.loader = schemaNegotiator.build();
        this.converter = new AvroColumnConverterFactory(providedSchema).getRootConverter(providedSchema, new TupleSchema(), this.loader.writer());
        String property = kafkaConsumerProps.getProperty("key.deserializer");
        this.deserializeKey = property != null && property.equals(KafkaAvroDeserializer.class.getName());
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public void readMessage(ConsumerRecord<?, ?> consumerRecord) {
        RowSetLoader writer = this.loader.writer();
        GenericRecord genericRecord = (GenericRecord) this.deserializer.deserialize((String) null, (byte[]) consumerRecord.value());
        Schema schema = genericRecord.getSchema();
        if (Schema.Type.RECORD != schema.getType()) {
            throw UserException.dataReadError().message(String.format("Root object must be record type. Found: %s", schema.getType()), new Object[0]).addContext("Reader", new Object[]{this}).build(logger);
        }
        writer.start();
        this.converter.convert(genericRecord);
        writeValue(writer, MetaDataField.KAFKA_TOPIC, consumerRecord.topic());
        writeValue(writer, MetaDataField.KAFKA_PARTITION_ID, Integer.valueOf(consumerRecord.partition()));
        writeValue(writer, MetaDataField.KAFKA_OFFSET, Long.valueOf(consumerRecord.offset()));
        writeValue(writer, MetaDataField.KAFKA_TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
        writeValue(writer, MetaDataField.KAFKA_MSG_KEY, consumerRecord.key() != null ? getKeyValue((byte[]) consumerRecord.key()) : null);
        writer.save();
    }

    private Object getKeyValue(byte[] bArr) {
        return this.deserializeKey ? this.deserializer.deserialize((String) null, bArr).toString() : new String(bArr);
    }

    private <T> void writeValue(RowSetLoader rowSetLoader, MetaDataField metaDataField, T t) {
        if (rowSetLoader.tupleSchema().column(metaDataField.getFieldName()) == null) {
            rowSetLoader.addColumn(MetadataUtils.newScalar(metaDataField.getFieldName(), metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL));
        }
        rowSetLoader.column(metaDataField.getFieldName()).setObject(t);
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin kafkaStoragePlugin) {
        return new KafkaConsumer<>(kafkaStoragePlugin.m8getConfig().getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public ResultSetLoader getResultSetLoader() {
        return this.loader;
    }

    @Override // org.apache.drill.exec.store.kafka.decoders.MessageReader
    public boolean endBatch() {
        return this.loader.hasRows();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.deserializer.close();
            this.loader.close();
        } catch (Exception e) {
            logger.warn("Error while closing AvroMessageReader: {}", e.getMessage());
        }
    }
}
