/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.MetricCollector;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.TopicSensors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;

public class ConsumerCollector
implements MetricCollector {
    private final Map<String, TopicSensors> topicSensors = new HashMap<String, TopicSensors>();
    private Metrics metrics;
    private String id;
    private String groupId;
    private Time time;

    @Override
    public void configure(Map<String, ?> map) {
        String id = (String)map.get("group.id");
        if (id != null) {
            this.groupId = id;
        }
        if (id == null) {
            id = (String)map.get("client.id");
        }
        if (id.contains("")) {
            this.configure(MetricCollectors.getMetrics(), MetricCollectors.addCollector(id, this), MetricCollectors.getTime());
        }
    }

    ConsumerCollector configure(Metrics metrics, String id, Time time) {
        this.id = id;
        this.metrics = metrics;
        this.time = time;
        return this;
    }

    @Override
    public String getGroupId() {
        return this.groupId;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public ConsumerRecords onConsume(ConsumerRecords records) {
        this.collect(records);
        return records;
    }

    private void collect(ConsumerRecords consumerRecords) {
        Stream<ConsumerRecord> stream = StreamSupport.stream(consumerRecords.spliterator(), false);
        stream.forEach(record -> this.record(record.topic().toLowerCase(), false, (ConsumerRecord)record));
    }

    @Override
    public void recordError(String topic) {
        this.record(topic, true, null);
    }

    private void record(String topic, boolean isError, ConsumerRecord record) {
        this.topicSensors.computeIfAbsent(this.getCounterKey(topic), k -> new TopicSensors(topic, this.buildSensors((String)k))).increment(record, isError);
    }

    private String getCounterKey(String topic) {
        return topic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TopicSensors.SensorMetric<ConsumerRecord>> buildSensors(String key) {
        ArrayList<TopicSensors.SensorMetric<ConsumerRecord>> sensors = new ArrayList<TopicSensors.SensorMetric<ConsumerRecord>>();
        Metrics metrics = this.metrics;
        synchronized (metrics) {
            this.addSensor(key, "messages-per-sec", (MeasurableStat)new Rate(), sensors, false);
            this.addSensor(key, "c-total-messages", (MeasurableStat)new Total(), sensors, false);
            this.addSensor(key, "c-failed-messages", (MeasurableStat)new Total(), sensors, true);
            this.addSensor(key, "c-total-message-bytes", (MeasurableStat)new Total(), sensors, false, r -> {
                if (r == null) {
                    return 0.0;
                }
                return (double)r.serializedValueSize() + (double)r.serializedKeySize();
            });
            this.addSensor(key, "failed-messages-per-sec", (MeasurableStat)new Rate(), sensors, true);
        }
        return sensors;
    }

    private void addSensor(String key, String metricNameString, MeasurableStat stat, List<TopicSensors.SensorMetric<ConsumerRecord>> sensors, boolean isError) {
        this.addSensor(key, metricNameString, stat, sensors, isError, r -> 1.0);
    }

    private void addSensor(String key, String metricNameString, MeasurableStat stat, List<TopicSensors.SensorMetric<ConsumerRecord>> sensors, boolean isError, final Function<ConsumerRecord, Double> recordValue) {
        String name = "cons-" + key + "-" + metricNameString + "-" + this.id;
        MetricName metricName = new MetricName(metricNameString, "consumer-metrics", "consumer-" + name, (Map)ImmutableMap.of((Object)"key", (Object)key, (Object)"id", (Object)this.id));
        Sensor existingSensor = this.metrics.getSensor(name);
        final Sensor sensor = this.metrics.sensor(name);
        if (existingSensor == null || this.metrics.metrics().get(metricName) == null) {
            sensor.add(metricName, stat);
        }
        KafkaMetric metric = (KafkaMetric)this.metrics.metrics().get(metricName);
        sensors.add(new TopicSensors.SensorMetric<ConsumerRecord>(sensor, metric, this.time, isError){

            @Override
            void record(ConsumerRecord record) {
                sensor.record(((Double)recordValue.apply(record)).doubleValue());
                super.record(record);
            }
        });
    }

    @Override
    public void close() {
        MetricCollectors.remove(this.id);
        this.topicSensors.values().forEach(v -> v.close(this.metrics));
    }

    @Override
    public Collection<TopicSensors.Stat> stats(String topic, boolean isError) {
        ArrayList<TopicSensors.Stat> list = new ArrayList<TopicSensors.Stat>();
        this.topicSensors.values().stream().filter(counter -> counter.isTopic(topic)).forEach(record -> list.addAll(record.stats(isError)));
        return list;
    }

    @Override
    public double currentMessageConsumptionRate() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.stats(false)));
        return allStats.stream().filter(stat -> stat.name().contains("messages-per-sec")).mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    @Override
    public double totalMessageConsumption() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.stats(false)));
        return allStats.stream().filter(stat -> stat.name().contains("c-total-messages")).mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    @Override
    public double totalBytesConsumption() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.stats(false)));
        return allStats.stream().filter(stat -> stat.name().contains("c-total-message-bytes")).mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    @Override
    public double errorRate() {
        ArrayList allStats = new ArrayList();
        this.topicSensors.values().forEach(record -> allStats.addAll(record.errorRateStats()));
        return allStats.stream().mapToDouble(TopicSensors.Stat::getValue).sum();
    }

    public String toString() {
        return this.getClass().getSimpleName() + " id:" + this.id + " " + this.topicSensors.keySet();
    }
}

