/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.flume.source.kafka.KafkaSourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSource
extends AbstractPollableSource
implements Configurable {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private ConsumerConnector consumer;
    private ConsumerIterator<byte[], byte[]> it;
    private String topic;
    private int batchUpperLimit;
    private int timeUpperLimit;
    private int consumerTimeout;
    private boolean kafkaAutoCommitEnabled;
    private Context context;
    private Properties kafkaProps;
    private final List<Event> eventList = new ArrayList<Event>();
    private KafkaSourceCounter counter;

    protected PollableSource.Status doProcess() throws EventDeliveryException {
        long batchStartTime = System.currentTimeMillis();
        long batchEndTime = System.currentTimeMillis() + (long)this.timeUpperLimit;
        try {
            boolean iterStatus = false;
            long startTime = System.nanoTime();
            while (this.eventList.size() < this.batchUpperLimit && System.currentTimeMillis() < batchEndTime) {
                iterStatus = this.hasNext();
                if (iterStatus) {
                    MessageAndMetadata messageAndMetadata = this.it.next();
                    byte[] kafkaMessage = (byte[])messageAndMetadata.message();
                    byte[] kafkaKey = (byte[])messageAndMetadata.key();
                    HashMap<String, String> headers = new HashMap<String, String>();
                    headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
                    headers.put("topic", this.topic);
                    if (kafkaKey != null) {
                        headers.put("key", new String(kafkaKey));
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Message: {}", (Object)new String(kafkaMessage));
                    }
                    Event event = EventBuilder.withBody((byte[])kafkaMessage, headers);
                    this.eventList.add(event);
                }
                if (!log.isDebugEnabled()) continue;
                log.debug("Waited: {} ", (Object)(System.currentTimeMillis() - batchStartTime));
                log.debug("Event #: {}", (Object)this.eventList.size());
            }
            long endTime = System.nanoTime();
            this.counter.addToKafkaEventGetTimer((endTime - startTime) / 1000000L);
            this.counter.addToEventReceivedCount(Long.valueOf(this.eventList.size()).longValue());
            if (this.eventList.size() > 0) {
                this.getChannelProcessor().processEventBatch(this.eventList);
                this.counter.addToEventAcceptedCount((long)this.eventList.size());
                this.eventList.clear();
                if (log.isDebugEnabled()) {
                    log.debug("Wrote {} events to channel", (Object)this.eventList.size());
                }
                if (!this.kafkaAutoCommitEnabled) {
                    long commitStartTime = System.nanoTime();
                    this.consumer.commitOffsets();
                    long commitEndTime = System.nanoTime();
                    this.counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / 1000000L);
                }
            }
            if (!iterStatus) {
                if (log.isDebugEnabled()) {
                    this.counter.incrementKafkaEmptyCount();
                    log.debug("Returning with backoff. No more data to read");
                }
                return PollableSource.Status.BACKOFF;
            }
            return PollableSource.Status.READY;
        }
        catch (Exception e) {
            log.error("KafkaSource EXCEPTION, {}", (Throwable)e);
            return PollableSource.Status.BACKOFF;
        }
    }

    protected void doConfigure(Context context) throws FlumeException {
        this.context = context;
        this.batchUpperLimit = context.getInteger("batchSize", Integer.valueOf(1000));
        this.timeUpperLimit = context.getInteger("batchDurationMillis", Integer.valueOf(1000));
        this.topic = context.getString("topic");
        if (this.topic == null) {
            throw new ConfigurationException("Kafka topic must be specified.");
        }
        this.kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
        this.consumerTimeout = Integer.parseInt(this.kafkaProps.getProperty("consumer.timeout.ms"));
        this.kafkaAutoCommitEnabled = Boolean.parseBoolean(this.kafkaProps.getProperty("auto.commit.enable"));
        if (this.counter == null) {
            this.counter = new KafkaSourceCounter(this.getName());
        }
    }

    protected void doStart() throws FlumeException {
        log.info("Starting {}...", (Object)this);
        try {
            this.consumer = KafkaSourceUtil.getConsumer(this.kafkaProps);
        }
        catch (Exception e) {
            throw new FlumeException("Unable to create consumer. Check whether the ZooKeeper server is up and that the Flume agent can connect to it.", (Throwable)e);
        }
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(this.topic, 1);
        try {
            Map consumerMap = this.consumer.createMessageStreams(topicCountMap);
            List topicList = (List)consumerMap.get(this.topic);
            KafkaStream stream = (KafkaStream)topicList.get(0);
            this.it = stream.iterator();
        }
        catch (Exception e) {
            throw new FlumeException("Unable to get message iterator from Kafka", (Throwable)e);
        }
        log.info("Kafka source {} do started.", (Object)this.getName());
        this.counter.start();
    }

    protected void doStop() throws FlumeException {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        this.counter.stop();
        log.info("Kafka Source {} do stopped. Metrics: {}", (Object)this.getName(), (Object)this.counter);
    }

    boolean hasNext() {
        try {
            this.it.hasNext();
            return true;
        }
        catch (ConsumerTimeoutException e) {
            return false;
        }
    }
}

