package org.apache.flume.source.kafka.v09;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/kafka/v09/KafkaSource.class */
public class KafkaSource extends AbstractPollableSource implements Configurable {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private Context context;
    private Properties kafkaProps;
    private KafkaSourceCounter counter;
    private KafkaConsumer<String, byte[]> consumer;
    private Iterator<ConsumerRecord<String, byte[]>> it;
    private final List<Event> eventList = new ArrayList();
    private Map<TopicPartition, OffsetAndMetadata> toBeCommitted;
    private AtomicBoolean rebalanceFlag;
    private Map<String, String> headers;
    private int batchUpperLimit;
    private int maxBatchDurationMillis;
    private List<String> topicList;
    private Lock lock;

    protected PollableSource.Status doProcess() throws EventDeliveryException {
        String uuid = UUID.randomUUID().toString();
        try {
            long nanoTime = System.nanoTime();
            long currentTimeMillis = System.currentTimeMillis();
            long currentTimeMillis2 = System.currentTimeMillis() + this.maxBatchDurationMillis;
            while (true) {
                if (this.eventList.size() >= this.batchUpperLimit || System.currentTimeMillis() >= currentTimeMillis2) {
                    break;
                }
                if (this.it == null || !this.it.hasNext()) {
                    this.it = this.consumer.poll(Math.max(0L, currentTimeMillis2 - System.currentTimeMillis())).iterator();
                    if (this.rebalanceFlag.get()) {
                        this.rebalanceFlag.set(false);
                        break;
                    }
                    if (!this.it.hasNext()) {
                        if (log.isDebugEnabled()) {
                            this.counter.incrementKafkaEmptyCount();
                            log.debug("Returning with backoff. No more data to read");
                        }
                    }
                }
                ConsumerRecord<String, byte[]> next = this.it.next();
                byte[] bArr = (byte[]) next.value();
                String str = (String) next.key();
                this.headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
                this.headers.put(KafkaSourceConstants.TOPIC_HEADER, next.topic());
                this.headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(next.partition()));
                if (str != null) {
                    this.headers.put(KafkaSourceConstants.KEY_HEADER, str);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Message: {}", new String(bArr));
                    log.debug("Topic: {} Partition: {}", next.topic(), Integer.valueOf(next.partition()));
                }
                this.eventList.add(EventBuilder.withBody(bArr, this.headers));
                if (log.isDebugEnabled()) {
                    log.debug("Waited: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    log.debug("Event #: {}", Integer.valueOf(this.eventList.size()));
                }
                this.toBeCommitted.put(new TopicPartition(next.topic(), next.partition()), new OffsetAndMetadata(next.offset() + 1, uuid));
            }
            if (this.eventList.size() <= 0) {
                return PollableSource.Status.BACKOFF;
            }
            this.counter.addToKafkaEventGetTimer((System.nanoTime() - nanoTime) / 1000000);
            this.counter.addToEventReceivedCount(this.eventList.size());
            getChannelProcessor().processEventBatch(this.eventList);
            this.counter.addToEventAcceptedCount(this.eventList.size());
            if (log.isDebugEnabled()) {
                log.debug("Wrote {} events to channel", Integer.valueOf(this.eventList.size()));
            }
            this.eventList.clear();
            try {
                this.lock.lock();
                long nanoTime2 = System.nanoTime();
                this.consumer.commitSync(this.toBeCommitted);
                this.counter.addToKafkaCommitTimer((System.nanoTime() - nanoTime2) / 1000000);
                this.toBeCommitted.clear();
                this.lock.unlock();
                return PollableSource.Status.READY;
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (Exception e) {
            log.error("KafkaSource EXCEPTION, {}", e);
            return PollableSource.Status.BACKOFF;
        }
    }

    protected void doConfigure(Context context) throws FlumeException {
        this.context = context;
        this.headers = new HashMap(4);
        this.toBeCommitted = new HashMap();
        this.lock = new ReentrantLock();
        this.rebalanceFlag = new AtomicBoolean(false);
        String string = context.getString(KafkaSourceConstants.TOPICS);
        if (string == null) {
            throw new ConfigurationException("At least one Kafka topic must be specified.");
        }
        this.topicList = Arrays.asList(string.split("^\\s+|\\s*,\\s*|\\s+$"));
        this.batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, 1000).intValue();
        this.maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, 1000).intValue();
        setConsumerProps(context, context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS));
        if (this.counter == null) {
            this.counter = new KafkaSourceCounter(getName());
        }
    }

    private void setConsumerProps(Context context, String str) {
        this.kafkaProps = new Properties();
        String string = context.getString("kafka.consumer.group.id");
        if (string == null || string.isEmpty()) {
            string = KafkaSourceConstants.DEFAULT_GROUP_ID;
            log.info("Group ID was not specified. Using " + string + " as the group id.");
        }
        this.kafkaProps.put("key.deserializer", KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
        this.kafkaProps.put("value.deserializer", KafkaSourceConstants.DEFAULT_VALUE_DESERIAIZER);
        this.kafkaProps.put("group.id", KafkaSourceConstants.DEFAULT_GROUP_ID);
        this.kafkaProps.putAll(context.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
        if (str != null) {
            this.kafkaProps.put("bootstrap.servers", str);
        }
        this.kafkaProps.put("group.id", string);
        this.kafkaProps.put("enable.auto.commit", KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
        log.info(this.kafkaProps.toString());
    }

    Properties getConsumerProps() {
        return this.kafkaProps;
    }

    protected void doStart() throws FlumeException {
        log.info("Starting {}...", this);
        try {
            this.consumer = new KafkaConsumer<>(this.kafkaProps);
            this.consumer.subscribe(this.topicList, new SourceRebalanceListener(this.rebalanceFlag));
            this.it = this.consumer.poll(1000L).iterator();
            log.info("Kafka source {} started.", getName());
            this.counter.start();
        } catch (Exception e) {
            throw new FlumeException("Unable to create consumer. Check whether the Bootstrap server is up and that the Flume agent can connect to it.", e);
        }
    }

    protected void doStop() throws FlumeException {
        if (this.consumer != null) {
            try {
                this.lock.lock();
                this.consumer.wakeup();
                this.consumer.close();
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
        this.counter.stop();
        log.info("Kafka Source {} stopped. Metrics: {}", getName(), this.counter);
    }
}
