/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.MetricCollector;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.TopicSensors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;

public class ProducerCollector
implements MetricCollector {
    private final Map<String, TopicSensors> topicSensors = new HashMap<String, TopicSensors>();
    private Metrics metrics;
    private String id;
    private Time time;

    @Override
    public void configure(Map<String, ?> map) {
        String id = (String)map.get("client.id");
        this.configure(MetricCollectors.getMetrics(), MetricCollectors.addCollector(id, this), MetricCollectors.getTime());
    }

    ProducerCollector configure(Metrics metrics, String id, Time time) {
        this.id = id;
        this.metrics = metrics;
        this.time = time;
        return this;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        this.collect(record, false);
        return record;
    }

    @Override
    public void recordError(String topic) {
        this.collect(true, topic.toLowerCase());
    }

    private void collect(ProducerRecord record, boolean isError) {
        this.collect(isError, record.topic().toLowerCase());
    }

    private void collect(boolean isError, String topic) {
        this.topicSensors.computeIfAbsent(this.getKey(topic), k -> new TopicSensors(topic, this.buildSensors((String)k))).increment(null, isError);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TopicSensors.SensorMetric<ProducerRecord>> buildSensors(String key) {
        ArrayList<TopicSensors.SensorMetric<ProducerRecord>> sensors = new ArrayList<TopicSensors.SensorMetric<ProducerRecord>>();
        Metrics metrics = this.metrics;
        synchronized (metrics) {
            this.addSensor(key, "messages-per-sec", (MeasurableStat)new Rate(), sensors, false);
            this.addSensor(key, "total-messages", (MeasurableStat)new Total(), sensors, false);
            this.addSensor(key, "failed-messages", (MeasurableStat)new Total(), sensors, true);
            this.addSensor(key, "failed-messages-per-sec", (MeasurableStat)new Rate(), sensors, true);
        }
        return sensors;
    }

    private void addSensor(String key, String metricNameString, MeasurableStat stat, List<TopicSensors.SensorMetric<ProducerRecord>> results, boolean isError) {
        String name = "prod-" + key + "-" + metricNameString + "-" + this.id;
        MetricName metricName = new MetricName(metricNameString, "producer-metrics", "producer-" + name, (Map)ImmutableMap.of((Object)"key", (Object)key, (Object)"id", (Object)this.id));
        Sensor existingSensor = this.metrics.getSensor(name);
        final Sensor sensor = this.metrics.sensor(name);
        if (existingSensor == null || this.metrics.metrics().get(metricName) == null) {
            sensor.add(metricName, stat);
        }
        KafkaMetric metric = (KafkaMetric)this.metrics.metrics().get(metricName);
        results.add(new TopicSensors.SensorMetric<ProducerRecord>(sensor, metric, this.time, isError){

            @Override
            void record(ProducerRecord record) {
                sensor.record(1.0);
                super.record(record);
            }
        });
    }

    private String getKey(String topic) {
        return topic;
    }

    @Override
    public void close() {
        MetricCollectors.remove(this.id);
        this.topicSensors.values().forEach(v -> v.close(this.metrics));
    }

    @Override
    public Collection<TopicSensors.Stat> stats(String topic, boolean isError) {
        ArrayList<TopicSensors.Stat> list = new ArrayList<TopicSensors.Stat>();
        this.topicSensors.values().stream().filter(counter -> counter.isTopic(topic)).forEach(record -> list.addAll(record.stats(isError)));
        return list;
    }

    @Override
    public double currentMessageProductionRate() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.stats(false)));
        return allStats.stream().filter(stat -> stat.name().contains("messages-per-sec")).mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    @Override
    public double errorRate() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.errorRateStats()));
        return allStats.stream().mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    public String toString() {
        return this.getClass().getSimpleName() + " " + this.id + " " + this.topicSensors.toString();
    }
}

