/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.internal;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;

public class KsqlEngineMetrics
implements Closeable {
    private final List<Sensor> sensors;
    private final String metricGroupName;
    private final Sensor numActiveQueries;
    private final Sensor messagesIn;
    private final Sensor totalMessagesIn;
    private final Sensor totalBytesIn;
    private final Sensor messagesOut;
    private final Sensor numIdleQueries;
    private final Sensor messageConsumptionByQuery;
    private final Sensor errorRate;
    private final KsqlEngine ksqlEngine;

    public KsqlEngineMetrics(String metricGroupPrefix, KsqlEngine ksqlEngine) {
        this.ksqlEngine = ksqlEngine;
        this.sensors = new ArrayList<Sensor>();
        this.metricGroupName = metricGroupPrefix + "-query-stats";
        Metrics metrics = MetricCollectors.getMetrics();
        this.numActiveQueries = this.configureNumActiveQueries(metrics);
        this.messagesIn = this.configureMessagesIn(metrics);
        this.totalMessagesIn = this.configureTotalMessagesIn(metrics);
        this.totalBytesIn = this.configureTotalBytesIn(metrics);
        this.messagesOut = this.configureMessagesOut(metrics);
        this.numIdleQueries = this.configureIdleQueriesSensor(metrics);
        this.messageConsumptionByQuery = this.configureMessageConsumptionByQuerySensor(metrics);
        this.errorRate = this.configureErrorRate(metrics);
    }

    @Override
    public void close() {
        Metrics metrics = MetricCollectors.getMetrics();
        this.sensors.forEach(sensor -> metrics.removeSensor(sensor.name()));
    }

    public void updateMetrics() {
        this.recordMessagesConsumed(MetricCollectors.currentConsumptionRate());
        this.recordTotalMessagesConsumed(MetricCollectors.totalMessageConsumption());
        this.recordTotalBytesConsumed(MetricCollectors.totalBytesConsumption());
        this.recordMessagesProduced(MetricCollectors.currentProductionRate());
        this.recordMessageConsumptionByQueryStats(MetricCollectors.currentConsumptionRateByQuery());
        this.recordErrorRate(MetricCollectors.currentErrorRate());
    }

    List<Sensor> registeredSensors() {
        return this.sensors;
    }

    private void recordMessageConsumptionByQueryStats(Collection<Double> messagesConsumedByQuery) {
        this.numIdleQueries.record((double)messagesConsumedByQuery.stream().filter(value -> value == 0.0).count());
        messagesConsumedByQuery.forEach(arg_0 -> ((Sensor)this.messageConsumptionByQuery).record(arg_0));
    }

    private void recordMessagesProduced(double value) {
        this.messagesOut.record(value);
    }

    private void recordMessagesConsumed(double value) {
        this.messagesIn.record(value);
    }

    private void recordTotalBytesConsumed(double value) {
        this.totalBytesIn.record(value);
    }

    private void recordTotalMessagesConsumed(double value) {
        this.totalMessagesIn.record(value);
    }

    private void recordErrorRate(double value) {
        this.errorRate.record(value);
    }

    private Sensor configureErrorRate(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-error-rate");
        sensor.add(metrics.metricName("error-rate", this.metricGroupName, "The number of messages which were consumed but not processed. Messages may not be processed if, for instance, the message contents could not be deserialized due to an incompatible schema. Alternately, a consumed messages may not have been produced, hence being effectively dropped. Such messages would also be counted toward the error rate."), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureMessagesOut(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-messages-produced");
        sensor.add(metrics.metricName("messages-produced-per-sec", this.metricGroupName, "The number of messages produced per second across all queries"), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureMessagesIn(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-messages-consumed");
        sensor.add(metrics.metricName("messages-consumed-per-sec", this.metricGroupName, "The number of messages consumed per second across all queries"), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureTotalMessagesIn(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-total-messages-consumed");
        sensor.add(metrics.metricName("messages-consumed-total", this.metricGroupName, "The total number of messages consumed across all queries"), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureTotalBytesIn(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-total-bytes-consumed");
        sensor.add(metrics.metricName("bytes-consumed-total", this.metricGroupName, "The total number of bytes consumed across all queries"), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureNumActiveQueries(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, this.metricGroupName + "-active-queries");
        sensor.add(metrics.metricName("num-active-queries", this.metricGroupName, "The current number of active queries running in this engine"), new MeasurableStat(){

            public double measure(MetricConfig metricConfig, long l) {
                return KsqlEngineMetrics.this.ksqlEngine.numberOfLiveQueries();
            }

            public void record(MetricConfig metricConfig, double v, long l) {
            }
        });
        sensor.add(metrics.metricName("num-persistent-queries", this.metricGroupName, "The current number of persistent queries running in this engine"), new MeasurableStat(){

            public double measure(MetricConfig metricConfig, long l) {
                return KsqlEngineMetrics.this.ksqlEngine.numberOfPersistentQueries();
            }

            public void record(MetricConfig metricConfig, double v, long l) {
            }
        });
        return sensor;
    }

    private Sensor configureIdleQueriesSensor(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, "num-idle-queries");
        sensor.add(metrics.metricName("num-idle-queries", this.metricGroupName), (MeasurableStat)new Value());
        return sensor;
    }

    private Sensor configureMessageConsumptionByQuerySensor(Metrics metrics) {
        Sensor sensor = this.createSensor(metrics, "message-consumption-by-query");
        sensor.add(metrics.metricName("messages-consumed-max", this.metricGroupName), (MeasurableStat)new Max());
        sensor.add(metrics.metricName("messages-consumed-min", this.metricGroupName), (MeasurableStat)new Min());
        sensor.add(metrics.metricName("messages-consumed-avg", this.metricGroupName), (MeasurableStat)new Avg());
        return sensor;
    }

    private Sensor createSensor(Metrics metrics, String sensorName) {
        Sensor sensor = metrics.sensor(sensorName);
        this.sensors.add(sensor);
        return sensor;
    }
}

