/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.mapr.AbstractProducer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MapRKafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducer<K, V>
implements Producer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private final ProducerConfig config;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private Producer<K, V> producerDriver;
    private boolean closed;

    public KafkaProducer(Map<String, Object> configs) {
        this(new ProducerConfig(configs), null, null);
    }

    public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer);
    }

    public KafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), null, null);
    }

    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);
    }

    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        log.debug("Starting the Kafka producer");
        this.config = config;
        this.closed = false;
        this.keySerializer = keySerializer == null ? this.config.getConfiguredInstance("key.serializer", Serializer.class) : keySerializer;
        this.valueSerializer = valueSerializer == null ? this.config.getConfiguredInstance("value.serializer", Serializer.class) : valueSerializer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeProducer(String topic) {
        KafkaProducer kafkaProducer = this;
        synchronized (kafkaProducer) {
            if (this.closed) {
                log.error("cannot initialize producer.  already closed.");
                return;
            }
            if (this.producerDriver != null) {
                log.debug("already initlialized producer.");
                return;
            }
            if (topic.startsWith("/") || topic.contains(":")) {
                AbstractProducer ap;
                GenericHFactory producerFactory = new GenericHFactory();
                this.producerDriver = ap = (AbstractProducer)producerFactory.getImplementorInstance("com.mapr.fs.marlin.producer.MarlinProducer", new Object[]{this.config, this.keySerializer, this.valueSerializer}, ProducerConfig.class, Serializer.class, Serializer.class);
            } else {
                List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                if (addresses.size() == 0 || addresses.get(0).equals("")) {
                    throw new KafkaException("Bootstrap servers not specified in configuration");
                }
                this.producerDriver = new MapRKafkaProducer<K, V>(this.config, this.keySerializer, this.valueSerializer);
            }
        }
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        if (this.producerDriver == null) {
            this.initializeProducer(record.topic());
        }
        if (this.producerDriver == null) {
            if (callback != null) {
                callback.onCompletion(null, new IllegalStateException("producer closed, cannot send"));
            }
            return new FutureFailure(new ApiException("producer closed, cannot send"));
        }
        return this.producerDriver.send(record, callback);
    }

    @Override
    public void flush() {
        if (this.producerDriver == null) {
            log.error("producer not initialized, cannot flush.");
            return;
        }
        this.producerDriver.flush();
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        if (this.producerDriver == null) {
            this.initializeProducer(topic);
        }
        if (this.producerDriver == null) {
            log.error("producer closed, cannot get partitionsFor " + topic);
            return null;
        }
        return this.producerDriver.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        if (this.producerDriver == null) {
            log.error("producer not initialized, cannot get metrics");
            return null;
        }
        return this.producerDriver.metrics();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object producerDriverToClose = null;
        KafkaProducer kafkaProducer = this;
        synchronized (kafkaProducer) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.producerDriver == null) {
                return;
            }
        }
        this.producerDriver.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(long timeout, TimeUnit timeUnit) {
        Object producerDriverToClose = null;
        KafkaProducer kafkaProducer = this;
        synchronized (kafkaProducer) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.producerDriver == null) {
                return;
            }
        }
        this.producerDriver.close(timeout, timeUnit);
    }

    private static class FutureFailure
    implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exception) {
            this.exception = new ExecutionException(exception);
        }

        @Override
        public boolean cancel(boolean interrupt) {
            return false;
        }

        @Override
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
            throw this.exception;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    }
}

