/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafkarest.AvroRestProducer;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.NoSchemaRestProducer;
import io.confluent.kafkarest.ProduceTask;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.RestProducer;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.SchemaHolder;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerPool {
    private static final Logger log = LoggerFactory.getLogger(ProducerPool.class);
    private Map<EmbeddedFormat, RestProducer> kafkaProducers = new HashMap<EmbeddedFormat, RestProducer>();
    private Map<EmbeddedFormat, RestProducer> streamsProducers = new HashMap<EmbeddedFormat, RestProducer>();
    private SimpleProducerCache producerCache;
    private boolean isStreams;
    private boolean defaultStreamSet;
    private boolean isImpersonationEnabled;
    private Map<String, Object> binaryStreamProps;
    private Map<String, Object> jsonStreamProps;

    public ProducerPool(KafkaRestConfig appConfig) {
        this(appConfig, null);
    }

    public ProducerPool(KafkaRestConfig appConfig, Properties producerConfigOverrides) {
        this(appConfig, appConfig.bootstrapBrokers(), producerConfigOverrides);
    }

    public ProducerPool(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        this.isStreams = appConfig.isStreams();
        this.defaultStreamSet = appConfig.isDefaultStreamSet();
        this.isImpersonationEnabled = appConfig.isImpersonationEnabled();
        if (!this.isStreams) {
            Map<String, Object> binaryProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
            this.kafkaProducers.put(EmbeddedFormat.BINARY, this.buildBinaryProducer(binaryProps));
            Map<String, Object> jsonProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
            this.kafkaProducers.put(EmbeddedFormat.JSON, this.buildJsonProducer(jsonProps));
            Map<String, Object> avroProps = this.buildAvroConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
            this.kafkaProducers.put(EmbeddedFormat.AVRO, this.buildAvroProducer(avroProps));
        }
        this.binaryStreamProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
        this.jsonStreamProps = this.buildStandardConfig(appConfig, bootstrapBrokers, producerConfigOverrides);
        if (!this.isImpersonationEnabled) {
            this.streamsProducers.put(EmbeddedFormat.BINARY, this.buildBinaryProducer(this.binaryStreamProps));
            this.streamsProducers.put(EmbeddedFormat.JSON, this.buildJsonProducer(this.jsonStreamProps));
        } else {
            this.producerCache = new SimpleProducerCache(appConfig);
        }
    }

    private Map<String, Object> buildStandardConfig(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", bootstrapBrokers);
        Properties producerProps = appConfig.getProducerProperties();
        String defaultStream = appConfig.getString("streams.default.stream");
        if (!"".equals(defaultStream)) {
            props.put("streams.producer.default.stream", defaultStream);
        }
        int streamBuffer = appConfig.getInt("producer.streams.buffer.max.time.ms");
        props.put("streams.buffer.max.time.ms", streamBuffer);
        return this.buildConfig(props, producerProps, producerConfigOverrides);
    }

    private NoSchemaRestProducer<byte[], byte[]> buildBinaryProducer(Map<String, Object> binaryProps) {
        return this.buildNoSchemaProducer(binaryProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    private NoSchemaRestProducer<Object, Object> buildJsonProducer(Map<String, Object> jsonProps) {
        return this.buildNoSchemaProducer(jsonProps, (Serializer)new KafkaJsonSerializer(), (Serializer)new KafkaJsonSerializer());
    }

    private <K, V> NoSchemaRestProducer<K, V> buildNoSchemaProducer(Map<String, Object> props, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        keySerializer.configure(props, true);
        valueSerializer.configure(props, false);
        KafkaProducer producer = new KafkaProducer(props, keySerializer, valueSerializer);
        return new NoSchemaRestProducer(producer);
    }

    private Map<String, Object> buildAvroConfig(KafkaRestConfig appConfig, String bootstrapBrokers, Properties producerConfigOverrides) {
        HashMap<String, Object> avroDefaults = new HashMap<String, Object>();
        avroDefaults.put("bootstrap.servers", bootstrapBrokers);
        avroDefaults.put("schema.registry.url", appConfig.getString("schema.registry.url"));
        Properties producerProps = appConfig.getProducerProperties();
        return this.buildConfig(avroDefaults, producerProps, producerConfigOverrides);
    }

    private AvroRestProducer buildAvroProducer(Map<String, Object> avroProps) {
        KafkaAvroSerializer avroKeySerializer = new KafkaAvroSerializer();
        avroKeySerializer.configure(avroProps, true);
        KafkaAvroSerializer avroValueSerializer = new KafkaAvroSerializer();
        avroValueSerializer.configure(avroProps, false);
        KafkaProducer avroProducer = new KafkaProducer(avroProps, (Serializer)avroKeySerializer, (Serializer)avroValueSerializer);
        return new AvroRestProducer((KafkaProducer<Object, Object>)avroProducer, avroKeySerializer, avroValueSerializer);
    }

    private Map<String, Object> buildConfig(Map<String, Object> defaults, Properties userProps, Properties overrides) {
        HashMap<String, Object> config = new HashMap<String, Object>(defaults);
        for (String propName : userProps.stringPropertyNames()) {
            config.put(propName, userProps.getProperty(propName));
        }
        if (overrides != null) {
            for (String propName : overrides.stringPropertyNames()) {
                config.put(propName, overrides.getProperty(propName));
            }
        }
        return config;
    }

    public <K, V> void produce(String topic, Integer partition, EmbeddedFormat recordFormat, SchemaHolder schemaHolder, Collection<? extends ProduceRecord<K, V>> records, ProduceRequestCallback callback, String userName) {
        ProduceTask task = new ProduceTask(schemaHolder, records.size(), callback);
        log.trace("Starting produce task " + task.toString());
        RestProducer restProducer = null;
        if (this.isStreams) {
            if (!this.defaultStreamSet && !topic.contains(":")) {
                throw Errors.topicNotFoundException();
            }
            restProducer = this.isImpersonationEnabled ? (recordFormat.equals((Object)EmbeddedFormat.BINARY) ? this.producerCache.getBinaryProducer(userName) : this.producerCache.getJsonProducer(userName)) : this.streamsProducers.get((Object)recordFormat);
            try {
                restProducer.produce(task, topic, partition, records);
            }
            catch (RestServerErrorException e) {
                log.warn("Producer error " + (Object)((Object)e));
                throw Errors.topicPermissionException();
            }
        }
    }

    public void shutdown() {
        for (RestProducer restProducer : this.kafkaProducers.values()) {
            restProducer.close();
        }
        for (RestProducer restProducer : this.streamsProducers.values()) {
            restProducer.close();
        }
        if (this.producerCache != null) {
            this.producerCache.shutdown();
        }
    }

    class SimpleProducerCache {
        private final int maxCachesNum;
        private Queue<String> binaryOldestCache;
        private Queue<String> jsonOldestCache;
        private ConcurrentMap<String, RestProducer> binaryHighLevelCache;
        private ConcurrentMap<String, RestProducer> jsonHighLevelCache;

        SimpleProducerCache(KafkaRestConfig config) {
            this.maxCachesNum = config.getInt("producers.max.caches.num");
            this.binaryHighLevelCache = new ConcurrentHashMap<String, RestProducer>(this.maxCachesNum);
            this.jsonHighLevelCache = new ConcurrentHashMap<String, RestProducer>(this.maxCachesNum);
            this.binaryOldestCache = new ConcurrentLinkedQueue<String>();
            this.jsonOldestCache = new ConcurrentLinkedQueue<String>();
        }

        RestProducer getBinaryProducer(String userName) {
            if (this.maxCachesNum > 0) {
                RestProducer cache = (RestProducer)this.binaryHighLevelCache.get(userName);
                if (cache == null) {
                    if (this.binaryHighLevelCache.size() >= this.maxCachesNum) {
                        String eldest = this.binaryOldestCache.poll();
                        ((RestProducer)this.binaryHighLevelCache.remove(eldest)).close();
                    }
                    cache = ProducerPool.this.buildBinaryProducer(ProducerPool.this.binaryStreamProps);
                    this.binaryHighLevelCache.put(userName, cache);
                    this.binaryOldestCache.add(userName);
                }
                return cache;
            }
            return ProducerPool.this.buildBinaryProducer(ProducerPool.this.binaryStreamProps);
        }

        RestProducer getJsonProducer(String userName) {
            if (this.maxCachesNum > 0) {
                RestProducer cache = (RestProducer)this.jsonHighLevelCache.get(userName);
                if (cache == null) {
                    if (this.jsonHighLevelCache.size() >= this.maxCachesNum) {
                        String eldest = this.jsonOldestCache.poll();
                        ((RestProducer)this.jsonHighLevelCache.remove(eldest)).close();
                    }
                    cache = ProducerPool.this.buildJsonProducer(ProducerPool.this.binaryStreamProps);
                    this.jsonHighLevelCache.put(userName, cache);
                    this.jsonOldestCache.add(userName);
                }
                return cache;
            }
            return ProducerPool.this.buildJsonProducer(ProducerPool.this.binaryStreamProps);
        }

        void shutdown() {
            for (RestProducer binaryRestProducer : this.binaryHighLevelCache.values()) {
                binaryRestProducer.close();
            }
            for (RestProducer jsonRestProducer : this.jsonHighLevelCache.values()) {
                jsonRestProducer.close();
            }
        }
    }

    public static interface ProduceRequestCallback {
        public void onCompletion(Integer var1, Integer var2, List<RecordMetadataOrException> var3);
    }
}

