/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sqoop.common.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private final ConsumerConnector consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)KafkaConsumer.createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1"));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;

    private static ConsumerConfig createConsumerConfig(String zkUrl, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zkUrl);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("consumer.timeout.ms", "1000");
        return new ConsumerConfig(props);
    }

    public void initTopicList(List<String> topics) {
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        for (String topic : topics) {
            topicCountMap.put(topic, new Integer(1));
        }
        this.consumerMap = this.consumer.createMessageStreams(topicCountMap);
    }

    public MessageAndMetadata getNextMessage(String topic) {
        List<KafkaStream<byte[], byte[]>> streams = this.consumerMap.get(topic);
        KafkaStream<byte[], byte[]> stream = streams.get(0);
        ConsumerIterator it = stream.iterator();
        boolean counter = false;
        try {
            if (it.hasNext()) {
                return it.next();
            }
            return null;
        }
        catch (ConsumerTimeoutException e) {
            logger.error("0 messages available to fetch for the topic " + topic);
            return null;
        }
    }

    public void shutdown() {
        this.consumer.shutdown();
    }
}

