package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/workload/ConsumeBenchWorker.class */
public class ConsumeBenchWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private final String id;
    private final ConsumeBenchSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private KafkaConsumer<byte[], byte[]> consumer;

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/ConsumeBenchWorker$ConsumeMessages.class */
    public class ConsumeMessages implements Callable<Void> {
        private final Histogram latencyHistogram = new Histogram(5000);
        private final Histogram messageSizeHistogram = new Histogram(2097152);
        private final Future<?> statusUpdaterFuture;
        private final Throttle throttle;

        ConsumeMessages(Collection<TopicPartition> collection) {
            this.statusUpdaterFuture = ConsumeBenchWorker.this.executor.scheduleAtFixedRate(new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram), 1L, 1L, TimeUnit.MINUTES);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", ConsumeBenchWorker.this.spec.bootstrapServers());
            properties.put("client.id", "consumer." + ConsumeBenchWorker.this.id);
            properties.put("group.id", "consumer-group-1");
            properties.put("auto.offset.reset", "earliest");
            properties.put("max.poll.interval.ms", 100000);
            WorkerUtils.addConfigsToProperties(properties, ConsumeBenchWorker.this.spec.commonClientConf(), ConsumeBenchWorker.this.spec.consumerConf());
            ConsumeBenchWorker.this.consumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
            ConsumeBenchWorker.this.consumer.assign(collection);
            this.throttle = new Throttle(WorkerUtils.perSecToPerPeriod(ConsumeBenchWorker.this.spec.targetMessagesPerSec(), 100L), ConsumeBenchWorker.THROTTLE_PERIOD_MS);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            long j = 0;
            long j2 = 0;
            long milliseconds = Time.SYSTEM.milliseconds();
            long j3 = milliseconds;
            while (j < ConsumeBenchWorker.this.spec.maxMessages()) {
                try {
                    try {
                        ConsumerRecords poll = ConsumeBenchWorker.this.consumer.poll(Duration.ofMillis(50L));
                        if (!poll.isEmpty()) {
                            long milliseconds2 = Time.SYSTEM.milliseconds() - j3;
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                j++;
                                long j4 = 0;
                                if (consumerRecord.key() != null) {
                                    j4 = 0 + consumerRecord.serializedKeySize();
                                }
                                if (consumerRecord.value() != null) {
                                    j4 += consumerRecord.serializedValueSize();
                                }
                                this.latencyHistogram.add(milliseconds2);
                                this.messageSizeHistogram.add(j4);
                                j2 += j4;
                                this.throttle.increment();
                            }
                            j3 = Time.SYSTEM.milliseconds();
                        }
                    } catch (Exception e) {
                        WorkerUtils.abort(ConsumeBenchWorker.log, "ConsumeRecords", e, ConsumeBenchWorker.this.doneFuture);
                        this.statusUpdaterFuture.cancel(false);
                        ConsumeBenchWorker.log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update()});
                    }
                } catch (Throwable th) {
                    this.statusUpdaterFuture.cancel(false);
                    ConsumeBenchWorker.log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update()});
                    throw th;
                }
            }
            this.statusUpdaterFuture.cancel(false);
            ConsumeBenchWorker.log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(Time.SYSTEM.milliseconds() - milliseconds), new StatusUpdater(this.latencyHistogram, this.messageSizeHistogram).update()});
            ConsumeBenchWorker.this.doneFuture.complete("");
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/ConsumeBenchWorker$Prepare.class */
    public class Prepare implements Runnable {
        public Prepare() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                HashSet hashSet = new HashSet();
                for (Map.Entry<String, PartitionsSpec> entry : ConsumeBenchWorker.this.spec.activeTopics().materialize().entrySet()) {
                    Iterator<Integer> it = entry.getValue().partitionNumbers().iterator();
                    while (it.hasNext()) {
                        hashSet.add(new TopicPartition(entry.getKey(), it.next().intValue()));
                    }
                }
                ConsumeBenchWorker.log.info("Will consume from {}", hashSet);
                ConsumeBenchWorker.this.executor.submit(new ConsumeMessages(hashSet));
            } catch (Throwable th) {
                WorkerUtils.abort(ConsumeBenchWorker.log, "Prepare", th, ConsumeBenchWorker.this.doneFuture);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/ConsumeBenchWorker$StatusData.class */
    public static class StatusData {
        private final long totalMessagesReceived;
        private final long totalBytesReceived;
        private final long averageMessageSizeBytes;
        private final float averageLatencyMs;
        private final int p50LatencyMs;
        private final int p95LatencyMs;
        private final int p99LatencyMs;
        static final float[] PERCENTILES = {0.5f, 0.95f, 0.99f};

        @JsonCreator
        StatusData(@JsonProperty("totalMessagesReceived") long j, @JsonProperty("totalBytesReceived") long j2, @JsonProperty("averageMessageSizeBytes") long j3, @JsonProperty("averageLatencyMs") float f, @JsonProperty("p50LatencyMs") int i, @JsonProperty("p95LatencyMs") int i2, @JsonProperty("p99LatencyMs") int i3) {
            this.totalMessagesReceived = j;
            this.totalBytesReceived = j2;
            this.averageMessageSizeBytes = j3;
            this.averageLatencyMs = f;
            this.p50LatencyMs = i;
            this.p95LatencyMs = i2;
            this.p99LatencyMs = i3;
        }

        @JsonProperty
        public long totalMessagesReceived() {
            return this.totalMessagesReceived;
        }

        @JsonProperty
        public long totalBytesReceived() {
            return this.totalBytesReceived;
        }

        @JsonProperty
        public long averageMessageSizeBytes() {
            return this.averageMessageSizeBytes;
        }

        @JsonProperty
        public float averageLatencyMs() {
            return this.averageLatencyMs;
        }

        @JsonProperty
        public int p50LatencyMs() {
            return this.p50LatencyMs;
        }

        @JsonProperty
        public int p95LatencyMs() {
            return this.p95LatencyMs;
        }

        @JsonProperty
        public int p99LatencyMs() {
            return this.p99LatencyMs;
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/ConsumeBenchWorker$StatusUpdater.class */
    public class StatusUpdater implements Runnable {
        private final Histogram latencyHistogram;
        private final Histogram messageSizeHistogram;

        StatusUpdater(Histogram histogram, Histogram histogram2) {
            this.latencyHistogram = histogram;
            this.messageSizeHistogram = histogram2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                update();
            } catch (Exception e) {
                WorkerUtils.abort(ConsumeBenchWorker.log, "StatusUpdater", e, ConsumeBenchWorker.this.doneFuture);
            }
        }

        StatusData update() {
            Histogram.Summary summarize = this.latencyHistogram.summarize(StatusData.PERCENTILES);
            Histogram.Summary summarize2 = this.messageSizeHistogram.summarize(StatusData.PERCENTILES);
            StatusData statusData = new StatusData(summarize.numSamples(), ((float) summarize2.numSamples()) * summarize2.average(), summarize2.average(), summarize.average(), summarize.percentiles().get(0).value(), summarize.percentiles().get(1).value(), summarize.percentiles().get(2).value());
            ConsumeBenchWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
            ConsumeBenchWorker.log.info("Status={}", JsonUtil.toJsonString(statusData));
            return statusData;
        }
    }

    public ConsumeBenchWorker(String str, ConsumeBenchSpec consumeBenchSpec) {
        this.id = str;
        this.spec = consumeBenchSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ConsumeBenchWorker with {}", this.id, this.spec);
        this.executor = Executors.newScheduledThreadPool(2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
        this.status = workerStatusTracker;
        this.doneFuture = kafkaFutureImpl;
        this.executor.submit(new Prepare());
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ConsumeBenchWorker is not running.");
        }
        log.info("{}: Deactivating ConsumeBenchWorker.", this.id);
        this.doneFuture.complete("");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        Utils.closeQuietly(this.consumer, "consumer");
        this.consumer = null;
        this.executor = null;
        this.status = null;
        this.doneFuture = null;
    }
}
