/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducer<K, V>
implements Producer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.producer";
    private String clientId;
    private Partitioner partitioner;
    private int maxRequestSize;
    private long totalMemorySize;
    private Metadata metadata;
    private RecordAccumulator accumulator;
    private Sender sender;
    private Metrics metrics;
    private Thread ioThread;
    private CompressionType compressionType;
    private Sensor errors;
    private Time time;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private ProducerConfig producerConfig;
    private long maxBlockTimeMs;
    private int requestTimeoutMs;
    private final ProducerConfig config;
    private boolean isStreams;
    private Producer<K, V> producerDriver;
    private boolean closed;
    private String defaultStream = null;

    public KafkaProducer(Map<String, Object> configs) {
        this(new ProducerConfig(configs), null, null);
    }

    public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), keySerializer, valueSerializer);
    }

    public KafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), null, null);
    }

    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), keySerializer, valueSerializer);
    }

    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        log.debug("Starting the Kafka producer");
        this.config = config;
        this.closed = false;
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
            this.keySerializer.configure(config.originals(), true);
        } else {
            config.ignore("key.serializer");
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
            this.valueSerializer.configure(config.originals(), false);
        } else {
            config.ignore("value.serializer");
            this.valueSerializer = valueSerializer;
        }
        this.defaultStream = null;
        try {
            this.defaultStream = config.getString("streams.producer.default.stream");
            if (this.defaultStream == "") {
                this.defaultStream = null;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.defaultStream != null) {
            this.initializeProducer(this.defaultStream + ":");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeProducer(String topic) {
        KafkaProducer kafkaProducer = this;
        synchronized (kafkaProducer) {
            if (this.closed) {
                log.error("cannot initialize producer.  already closed.");
                return;
            }
            if (this.producerDriver != null) {
                log.debug("already initlialized producer.");
                return;
            }
            if (topic.startsWith("/") || topic.contains(":")) {
                Producer ap;
                GenericHFactory producerFactory = new GenericHFactory();
                this.producerDriver = ap = (Producer)producerFactory.getImplementorInstance("com.mapr.streams.impl.producer.MarlinProducer", new Object[]{this.config, this.keySerializer, this.valueSerializer}, ProducerConfig.class, Serializer.class, Serializer.class);
                this.isStreams = true;
            } else {
                this.producerDriver = this;
                this.isStreams = false;
                List<InetSocketAddress> kafkaaddresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                if (kafkaaddresses.size() == 0 || kafkaaddresses.get(0).equals("")) {
                    throw new KafkaException("Bootstrap servers not specified in configuration");
                }
                try {
                    log.trace("Starting the Kafka producer");
                    Map<String, Object> userProvidedConfigs = this.config.originals();
                    this.producerConfig = this.config;
                    this.time = new SystemTime();
                    MetricConfig metricConfig = new MetricConfig().samples(this.config.getInt("metrics.num.samples")).timeWindow(this.config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
                    this.clientId = this.config.getString("client.id");
                    if (this.clientId.length() <= 0) {
                        this.clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
                    }
                    List<MetricsReporter> reporters = this.config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
                    reporters.add(new JmxReporter(JMX_PREFIX));
                    this.metrics = new Metrics(metricConfig, reporters, this.time);
                    this.partitioner = this.config.getConfiguredInstance("partitioner.class", Partitioner.class);
                    long retryBackoffMs = this.config.getLong("retry.backoff.ms");
                    this.metadata = new Metadata(retryBackoffMs, this.config.getLong("metadata.max.age.ms"));
                    this.maxRequestSize = this.config.getInt("max.request.size");
                    this.totalMemorySize = this.config.getLong("buffer.memory");
                    this.compressionType = CompressionType.forName(this.config.getString("compression.type"));
                    if (userProvidedConfigs.containsKey("block.on.buffer.full")) {
                        log.warn("block.on.buffer.full config is deprecated and will be removed soon. Please use max.block.ms");
                        boolean blockOnBufferFull = this.config.getBoolean("block.on.buffer.full");
                        if (blockOnBufferFull) {
                            this.maxBlockTimeMs = Long.MAX_VALUE;
                        } else if (userProvidedConfigs.containsKey("metadata.fetch.timeout.ms")) {
                            log.warn("metadata.fetch.timeout.ms config is deprecated and will be removed soon. Please use max.block.ms");
                            this.maxBlockTimeMs = this.config.getLong("metadata.fetch.timeout.ms");
                        } else {
                            this.maxBlockTimeMs = this.config.getLong("max.block.ms");
                        }
                    } else if (userProvidedConfigs.containsKey("metadata.fetch.timeout.ms")) {
                        log.warn("metadata.fetch.timeout.ms config is deprecated and will be removed soon. Please use max.block.ms");
                        this.maxBlockTimeMs = this.config.getLong("metadata.fetch.timeout.ms");
                    } else {
                        this.maxBlockTimeMs = this.config.getLong("max.block.ms");
                    }
                    if (userProvidedConfigs.containsKey("timeout.ms")) {
                        log.warn("timeout.ms config is deprecated and will be removed soon. Please use request.timeout.ms");
                        this.requestTimeoutMs = this.config.getInt("timeout.ms");
                    } else {
                        this.requestTimeoutMs = this.config.getInt("request.timeout.ms");
                    }
                    LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
                    metricTags.put("client-id", this.clientId);
                    this.accumulator = new RecordAccumulator(this.config.getInt("batch.size"), this.totalMemorySize, this.compressionType, this.config.getLong("linger.ms"), retryBackoffMs, this.metrics, this.time, metricTags);
                    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                    this.metadata.update(Cluster.bootstrap(addresses), this.time.milliseconds());
                    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.config.values());
                    NetworkClient client = new NetworkClient((Selectable)new Selector(this.config.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", metricTags, channelBuilder), this.metadata, this.clientId, (int)this.config.getInt("max.in.flight.requests.per.connection"), (long)this.config.getLong("reconnect.backoff.ms"), (int)this.config.getInt("send.buffer.bytes"), (int)this.config.getInt("receive.buffer.bytes"), this.requestTimeoutMs, this.time);
                    this.sender = new Sender(client, this.metadata, this.accumulator, this.config.getInt("max.request.size"), (short)KafkaProducer.parseAcks(this.config.getString("acks")), this.config.getInt("retries"), this.metrics, new SystemTime(), this.clientId, this.requestTimeoutMs);
                    String ioThreadName = "kafka-producer-network-thread" + (this.clientId.length() > 0 ? " | " + this.clientId : "");
                    this.ioThread = new KafkaThread(ioThreadName, (Runnable)this.sender, true);
                    this.ioThread.start();
                    this.errors = this.metrics.sensor("errors");
                    if (this.keySerializer == null) {
                        this.keySerializer = this.config.getConfiguredInstance("key.serializer", Serializer.class);
                        this.keySerializer.configure(this.config.originals(), true);
                    } else {
                        this.config.ignore("key.serializer");
                        this.keySerializer = this.keySerializer;
                    }
                    if (this.valueSerializer == null) {
                        this.valueSerializer = this.config.getConfiguredInstance("value.serializer", Serializer.class);
                        this.valueSerializer.configure(this.config.originals(), false);
                    } else {
                        this.config.ignore("value.serializer");
                        this.valueSerializer = this.valueSerializer;
                    }
                    this.config.logUnused();
                    AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId);
                    log.debug("Kafka producer started");
                }
                catch (Throwable t) {
                    this.close(0L, TimeUnit.MILLISECONDS, true);
                    throw new KafkaException("Failed to construct kafka producer", t);
                }
            }
        }
    }

    private static int parseAcks(String acksString) {
        try {
            return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim());
        }
        catch (NumberFormatException e) {
            throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
        }
    }

    private ProducerRecord<K, V> addDefaultStreamNameIfNeeded(ProducerRecord<K, V> record) {
        if (this.defaultStream == null || record.topic().startsWith("/")) {
            return record;
        }
        ProducerRecord<K, V> newRecord = null;
        newRecord = record.partition() == null ? new ProducerRecord<K, V>(this.defaultStream + ":" + record.topic(), record.key(), record.value()) : new ProducerRecord<K, V>(this.defaultStream + ":" + record.topic(), record.partition(), record.key(), record.value());
        return newRecord;
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, null);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        if (this.producerDriver == null) {
            this.initializeProducer(record.topic());
        }
        if (this.producerDriver == null) {
            if (callback != null) {
                callback.onCompletion(null, new IllegalStateException("producer closed, cannot send"));
            }
            return new FutureFailure(new ApiException("producer closed, cannot send"));
        }
        if (this.isStreams) {
            record = this.addDefaultStreamNameIfNeeded(record);
            return this.producerDriver.send(record, callback);
        }
        try {
            byte[] serializedValue;
            byte[] serializedKey;
            long waitedOnMetadataMs = this.waitOnMetadata(record.topic(), this.maxBlockTimeMs);
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - waitedOnMetadataMs);
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.key());
            }
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer");
            }
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.value());
            }
            catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer");
            }
            int partition2 = this.partition(record, serializedKey, serializedValue, this.metadata.fetch());
            int serializedSize = 12 + Record.recordSize(serializedKey, serializedValue);
            this.ensureValidRecordSize(serializedSize);
            TopicPartition tp = new TopicPartition(record.topic(), partition2);
            log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition2});
            RecordAccumulator.RecordAppendResult result2 = this.accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
            if (result2.batchIsFull || result2.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", (Object)record.topic(), (Object)partition2);
                this.sender.wakeup();
            }
            return result2.future;
        }
        catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null) {
                callback.onCompletion(null, e);
            }
            this.errors.record();
            return new FutureFailure(e);
        }
        catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        }
        catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            throw e;
        }
        catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
    }

    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        if (!this.metadata.containsTopic(topic)) {
            this.metadata.add(topic);
        }
        if (this.metadata.fetch().partitionsForTopic(topic) != null) {
            return 0L;
        }
        long begin = this.time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        while (this.metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", (Object)topic);
            int version = this.metadata.requestUpdate();
            this.sender.wakeup();
            this.metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = this.time.milliseconds() - begin;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            }
            if (this.metadata.fetch().unauthorizedTopics().contains(topic)) {
                throw new TopicAuthorizationException(topic);
            }
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return this.time.milliseconds() - begin;
    }

    private void ensureValidRecordSize(int size2) {
        if (size2 > this.maxRequestSize) {
            throw new RecordTooLargeException("The message is " + size2 + " bytes when serialized which is larger than the maximum request size you have configured with the " + "max.request.size" + " configuration.");
        }
        if ((long)size2 > this.totalMemorySize) {
            throw new RecordTooLargeException("The message is " + size2 + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration.");
        }
    }

    @Override
    public void flush() {
        log.trace("Flushing accumulated records in producer.");
        if (this.producerDriver == null) {
            log.info("producer not initialized, cannot flush.");
            return;
        }
        if (this.isStreams) {
            this.producerDriver.flush();
        } else {
            this.accumulator.beginFlush();
            this.sender.wakeup();
            try {
                this.accumulator.awaitFlushCompletion();
            }
            catch (InterruptedException e) {
                throw new InterruptException("Flush interrupted.", e);
            }
        }
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        if (this.producerDriver == null) {
            this.initializeProducer(topic);
        }
        if (this.producerDriver == null) {
            log.error("producer closed, cannot get partitionsFor " + topic);
            return null;
        }
        if (this.isStreams) {
            return this.producerDriver.partitionsFor(topic);
        }
        try {
            this.waitOnMetadata(topic, this.maxBlockTimeMs);
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
        return this.metadata.fetch().partitionsForTopic(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        if (this.producerDriver == null) {
            log.info("producer not initialized, cannot get metrics");
            return null;
        }
        if (this.isStreams) {
            return this.producerDriver.metrics();
        }
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public void close() {
        this.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close(long timeout, TimeUnit timeUnit) {
        this.close(timeout, timeUnit, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
        Producer<K, V> producerDriverToClose = null;
        KafkaProducer kafkaProducer = this;
        synchronized (kafkaProducer) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.producerDriver == null) {
                return;
            }
            producerDriverToClose = this.producerDriver;
            this.producerDriver = null;
        }
        if (this.isStreams) {
            producerDriverToClose.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } else {
            boolean invokedFromCallback;
            if (timeout < 0L) {
                throw new IllegalArgumentException("The timeout cannot be negative.");
            }
            log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
            AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
            boolean bl = invokedFromCallback = Thread.currentThread() == this.ioThread;
            if (timeout > 0L) {
                if (invokedFromCallback) {
                    log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
                } else {
                    if (this.sender != null) {
                        this.sender.initiateClose();
                    }
                    if (this.ioThread != null) {
                        try {
                            this.ioThread.join(timeUnit.toMillis(timeout));
                        }
                        catch (InterruptedException t) {
                            firstException.compareAndSet(null, t);
                            log.error("Interrupted while joining ioThread", t);
                        }
                    }
                }
            }
            if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
                log.info("Proceeding to force close the producer since pending requests could not be completed within timeout {} ms.", timeout);
                this.sender.forceClose();
                if (!invokedFromCallback) {
                    try {
                        this.ioThread.join();
                    }
                    catch (InterruptedException e) {
                        firstException.compareAndSet(null, e);
                    }
                }
            }
            ClientUtils.closeQuietly(this.metrics, "producer metrics", firstException);
            ClientUtils.closeQuietly(this.keySerializer, "producer keySerializer", firstException);
            ClientUtils.closeQuietly(this.valueSerializer, "producer valueSerializer", firstException);
            AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId);
            log.debug("The Kafka producer has closed.");
            if (firstException.get() != null && !swallowException) {
                throw new KafkaException("Failed to close kafka producer", firstException.get());
            }
        }
    }

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition2 = record.partition();
        if (partition2 != null) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int numPartitions = partitions.size();
            if (partition2 < 0 || partition2 >= numPartitions) {
                throw new IllegalArgumentException("Invalid partition given with record: " + partition2 + " is not in the range [0..." + numPartitions + "].");
            }
            return partition2;
        }
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

    private static class FutureFailure
    implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exception) {
            this.exception = new ExecutionException(exception);
        }

        @Override
        public boolean cancel(boolean interrupt) {
            return false;
        }

        @Override
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
            throw this.exception;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    }
}

