package org.apache.flume.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/StressSource.class */
public class StressSource extends AbstractSource implements Configurable, PollableSource {
    private static final Logger logger = LoggerFactory.getLogger(StressSource.class);
    private byte[] buffer;
    private long maxTotalEvents;
    private long maxSuccessfulEvents;
    private int batchSize;
    private Event event;
    private List<Event> eventBatchList;
    private List<Event> eventBatchListToProcess;
    private long lastSent = 0;
    private CounterGroup counterGroup = new CounterGroup();

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.maxTotalEvents = context.getLong("maxTotalEvents", -1L).longValue();
        this.maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L).longValue();
        this.batchSize = context.getInteger("batchSize", 1).intValue();
        prepEventData(context.getInteger("size", 500).intValue());
    }

    private void prepEventData(int i) {
        this.buffer = new byte[i];
        Arrays.fill(this.buffer, Byte.MAX_VALUE);
        if (this.batchSize <= 1) {
            this.event = EventBuilder.withBody(this.buffer);
            return;
        }
        this.eventBatchList = new ArrayList();
        for (int i2 = 0; i2 < this.batchSize; i2++) {
            this.eventBatchList.add(EventBuilder.withBody(this.buffer));
        }
    }

    @Override // org.apache.flume.PollableSource
    public PollableSource.Status process() throws EventDeliveryException {
        long longValue = this.counterGroup.addAndGet("events.total", Long.valueOf(this.lastSent)).longValue();
        if ((this.maxTotalEvents >= 0 && longValue >= this.maxTotalEvents) || (this.maxSuccessfulEvents >= 0 && this.counterGroup.get("events.successful").longValue() >= this.maxSuccessfulEvents)) {
            return PollableSource.Status.BACKOFF;
        }
        try {
            this.lastSent = this.batchSize;
            if (this.batchSize == 1) {
                getChannelProcessor().processEvent(this.event);
            } else {
                long j = this.maxTotalEvents - longValue;
                if (this.maxTotalEvents < 0 || j >= this.batchSize) {
                    this.eventBatchListToProcess = this.eventBatchList;
                } else {
                    this.eventBatchListToProcess = this.eventBatchList.subList(0, (int) j);
                }
                this.lastSent = this.eventBatchListToProcess.size();
                getChannelProcessor().processEventBatch(this.eventBatchListToProcess);
            }
            this.counterGroup.addAndGet("events.successful", Long.valueOf(this.lastSent));
            return PollableSource.Status.READY;
        } catch (ChannelException e) {
            this.counterGroup.addAndGet("events.failed", Long.valueOf(this.lastSent));
            return PollableSource.Status.BACKOFF;
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Stress source starting");
        super.start();
        logger.debug("Stress source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Stress source stopping");
        super.stop();
        logger.info("Stress source stopped. Metrics:{}", this.counterGroup);
    }
}
