package io.confluent.ksql.internal;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;

/* loaded from: input_file:io/confluent/ksql/internal/KsqlEngineMetrics.class */
public class KsqlEngineMetrics implements Closeable {
    private final List<Sensor> sensors = new ArrayList();
    private final String metricGroupName;
    private final Sensor numActiveQueries;
    private final Sensor messagesIn;
    private final Sensor totalMessagesIn;
    private final Sensor totalBytesIn;
    private final Sensor messagesOut;
    private final Sensor numIdleQueries;
    private final Sensor messageConsumptionByQuery;
    private final Sensor errorRate;
    private final KsqlEngine ksqlEngine;

    public KsqlEngineMetrics(String str, KsqlEngine ksqlEngine) {
        this.ksqlEngine = ksqlEngine;
        this.metricGroupName = str + "-query-stats";
        Metrics metrics = MetricCollectors.getMetrics();
        this.numActiveQueries = configureNumActiveQueries(metrics);
        this.messagesIn = configureMessagesIn(metrics);
        this.totalMessagesIn = configureTotalMessagesIn(metrics);
        this.totalBytesIn = configureTotalBytesIn(metrics);
        this.messagesOut = configureMessagesOut(metrics);
        this.numIdleQueries = configureIdleQueriesSensor(metrics);
        this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor(metrics);
        this.errorRate = configureErrorRate(metrics);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Metrics metrics = MetricCollectors.getMetrics();
        this.sensors.forEach(sensor -> {
            metrics.removeSensor(sensor.name());
        });
    }

    public void updateMetrics() {
        recordMessagesConsumed(MetricCollectors.currentConsumptionRate());
        recordTotalMessagesConsumed(MetricCollectors.totalMessageConsumption());
        recordTotalBytesConsumed(MetricCollectors.totalBytesConsumption());
        recordMessagesProduced(MetricCollectors.currentProductionRate());
        recordMessageConsumptionByQueryStats(MetricCollectors.currentConsumptionRateByQuery());
        recordErrorRate(MetricCollectors.currentErrorRate());
    }

    List<Sensor> registeredSensors() {
        return this.sensors;
    }

    private void recordMessageConsumptionByQueryStats(Collection<Double> collection) {
        this.numIdleQueries.record(collection.stream().filter(d -> {
            return d.doubleValue() == 0.0d;
        }).count());
        Sensor sensor = this.messageConsumptionByQuery;
        sensor.getClass();
        collection.forEach((v1) -> {
            r1.record(v1);
        });
    }

    private void recordMessagesProduced(double d) {
        this.messagesOut.record(d);
    }

    private void recordMessagesConsumed(double d) {
        this.messagesIn.record(d);
    }

    private void recordTotalBytesConsumed(double d) {
        this.totalBytesIn.record(d);
    }

    private void recordTotalMessagesConsumed(double d) {
        this.totalMessagesIn.record(d);
    }

    private void recordErrorRate(double d) {
        this.errorRate.record(d);
    }

    private Sensor configureErrorRate(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-error-rate");
        createSensor.add(metrics.metricName("error-rate", this.metricGroupName, "The number of messages which were consumed but not processed. Messages may not be processed if, for instance, the message contents could not be deserialized due to an incompatible schema. Alternately, a consumed messages may not have been produced, hence being effectively dropped. Such messages would also be counted toward the error rate."), new Value());
        return createSensor;
    }

    private Sensor configureMessagesOut(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-messages-produced");
        createSensor.add(metrics.metricName("messages-produced-per-sec", this.metricGroupName, "The number of messages produced per second across all queries"), new Value());
        return createSensor;
    }

    private Sensor configureMessagesIn(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-messages-consumed");
        createSensor.add(metrics.metricName("messages-consumed-per-sec", this.metricGroupName, "The number of messages consumed per second across all queries"), new Value());
        return createSensor;
    }

    private Sensor configureTotalMessagesIn(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-total-messages-consumed");
        createSensor.add(metrics.metricName("messages-consumed-total", this.metricGroupName, "The total number of messages consumed across all queries"), new Value());
        return createSensor;
    }

    private Sensor configureTotalBytesIn(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-total-bytes-consumed");
        createSensor.add(metrics.metricName("bytes-consumed-total", this.metricGroupName, "The total number of bytes consumed across all queries"), new Value());
        return createSensor;
    }

    private Sensor configureNumActiveQueries(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, this.metricGroupName + "-active-queries");
        createSensor.add(metrics.metricName("num-active-queries", this.metricGroupName, "The current number of active queries running in this engine"), new MeasurableStat() { // from class: io.confluent.ksql.internal.KsqlEngineMetrics.1
            public double measure(MetricConfig metricConfig, long j) {
                return KsqlEngineMetrics.this.ksqlEngine.numberOfLiveQueries();
            }

            public void record(MetricConfig metricConfig, double d, long j) {
            }
        });
        createSensor.add(metrics.metricName("num-persistent-queries", this.metricGroupName, "The current number of persistent queries running in this engine"), new MeasurableStat() { // from class: io.confluent.ksql.internal.KsqlEngineMetrics.2
            public double measure(MetricConfig metricConfig, long j) {
                return KsqlEngineMetrics.this.ksqlEngine.numberOfPersistentQueries();
            }

            public void record(MetricConfig metricConfig, double d, long j) {
            }
        });
        return createSensor;
    }

    private Sensor configureIdleQueriesSensor(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, "num-idle-queries");
        createSensor.add(metrics.metricName("num-idle-queries", this.metricGroupName), new Value());
        return createSensor;
    }

    private Sensor configureMessageConsumptionByQuerySensor(Metrics metrics) {
        Sensor createSensor = createSensor(metrics, "message-consumption-by-query");
        createSensor.add(metrics.metricName("messages-consumed-max", this.metricGroupName), new Max());
        createSensor.add(metrics.metricName("messages-consumed-min", this.metricGroupName), new Min());
        createSensor.add(metrics.metricName("messages-consumed-avg", this.metricGroupName), new Avg());
        return createSensor;
    }

    private Sensor createSensor(Metrics metrics, String str) {
        Sensor sensor = metrics.sensor(str);
        this.sensors.add(sensor);
        return sensor;
    }
}
