/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaStreamsMetadataObserver;
import io.confluent.kafkarest.SimpleConsumerFactory;
import io.confluent.kafkarest.SimpleConsumerPool;
import io.confluent.kafkarest.SimpleConsumerRecordsCache;
import io.confluent.kafkarest.TPConsumerState;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.converters.AvroConverter;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.kafkarest.entities.AvroConsumerRecord;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonConsumerRecord;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.ws.rs.core.Response;
import jersey.repackaged.com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerManager.class);
    private final int maxPoolSize;
    private final int poolInstanceAvailabilityTimeoutMs;
    private final Time time;
    private final KafkaStreamsMetadataObserver mdObserver;
    private final SimpleConsumerFactory simpleConsumerFactory;
    private final SimpleConsumerPool simpleConsumersPool;
    private final SimpleConsumerRecordsCache cache;
    private final Deserializer<Object> avroDeserializer;
    private final ObjectMapper objectMapper;

    public SimpleConsumerManager(KafkaRestConfig config, KafkaStreamsMetadataObserver mdObserver, SimpleConsumerFactory simpleConsumerFactory) {
        this.mdObserver = mdObserver;
        this.simpleConsumerFactory = simpleConsumerFactory;
        this.maxPoolSize = config.getInt("simpleconsumer.pool.size.max");
        this.poolInstanceAvailabilityTimeoutMs = config.getInt("simpleconsumer.pool.timeout.ms");
        this.time = config.getTime();
        this.simpleConsumersPool = new SimpleConsumerPool(this.maxPoolSize, this.poolInstanceAvailabilityTimeoutMs, this.time, simpleConsumerFactory, mdObserver);
        this.cache = new SimpleConsumerRecordsCache(config);
        Properties props = new Properties();
        props.setProperty("schema.registry.url", config.getString("schema.registry.url"));
        this.avroDeserializer = new KafkaAvroDeserializer();
        this.avroDeserializer.configure((Map)Maps.fromProperties((Properties)props), true);
        this.objectMapper = new ObjectMapper();
    }

    public void consume(String topicName, int partitionId, long offset, long count, EmbeddedFormat embeddedFormat, ConsumerManager.ReadCallback callback) {
        ArrayList<AbstractConsumerRecord> records = new ArrayList<AbstractConsumerRecord>();
        RestException exception = null;
        if (!this.mdObserver.topicExists(topicName)) {
            exception = Errors.topicNotFoundException();
        } else if (!this.mdObserver.partitionExists(topicName, partitionId)) {
            exception = Errors.partitionNotFoundException();
        } else {
            try (TPConsumerState consumer = this.simpleConsumersPool.get(topicName, partitionId);){
                List<ConsumerRecord<byte[], byte[]>> fetched = this.cache.pollRecords(consumer.consumer(), topicName, partitionId, offset, count);
                for (ConsumerRecord<byte[], byte[]> record : fetched) {
                    records.add(this.createConsumerRecord(record, record.topic(), record.partition(), embeddedFormat));
                }
            }
            catch (Throwable e) {
                if (e instanceof RestException) {
                    exception = (RestException)e;
                }
                exception = Errors.kafkaErrorException(e);
                log.warn("Internal error", e);
            }
        }
        callback.onCompletion(records, (Exception)exception);
    }

    private BinaryConsumerRecord createBinaryConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String topicName, int partitionId) {
        return new BinaryConsumerRecord((byte[])consumerRecord.key(), (byte[])consumerRecord.value(), topicName, partitionId, consumerRecord.offset());
    }

    private AvroConsumerRecord createAvroConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String topicName, int partitionId) {
        return new AvroConsumerRecord(AvroConverter.toJson((Object)this.avroDeserializer.deserialize((String)topicName, (byte[])((byte[])consumerRecord.key()))).json, AvroConverter.toJson((Object)this.avroDeserializer.deserialize((String)topicName, (byte[])((byte[])consumerRecord.value()))).json, topicName, partitionId, consumerRecord.offset());
    }

    private JsonConsumerRecord createJsonConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String topicName, int partitionId) {
        return new JsonConsumerRecord(this.deserializeJson((byte[])consumerRecord.key()), this.deserializeJson((byte[])consumerRecord.value()), topicName, partitionId, consumerRecord.offset());
    }

    private Object deserializeJson(byte[] data) {
        try {
            return data == null ? null : this.objectMapper.readValue(data, Object.class);
        }
        catch (Exception e) {
            throw new SerializationException((Throwable)e);
        }
    }

    private AbstractConsumerRecord createConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord, String topicName, int partitionId, EmbeddedFormat embeddedFormat) {
        switch (embeddedFormat) {
            case BINARY: {
                return this.createBinaryConsumerRecord(consumerRecord, topicName, partitionId);
            }
            case AVRO: {
                return this.createAvroConsumerRecord(consumerRecord, topicName, partitionId);
            }
            case JSON: {
                return this.createJsonConsumerRecord(consumerRecord, topicName, partitionId);
            }
        }
        throw new RestServerErrorException("Invalid embedded format for new consumer.", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
    }

    public void shutdown() {
        this.simpleConsumersPool.shutdown();
    }
}

