/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.channel.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaChannel
extends BasicChannelSemantics {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaChannel.class);
    private final Properties kafkaConf = new Properties();
    private Producer<String, byte[]> producer;
    private final String channelUUID = UUID.randomUUID().toString();
    private AtomicReference<String> topic = new AtomicReference();
    private boolean parseAsFlumeEvent = true;
    private final Map<String, Integer> topicCountMap = Collections.synchronizedMap(new HashMap());
    private final List<ConsumerAndIterator> consumers = Collections.synchronizedList(new LinkedList());
    private KafkaChannelCounter counter;
    private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new ThreadLocal<ConsumerAndIterator>(){

        @Override
        public ConsumerAndIterator initialValue() {
            return KafkaChannel.this.createConsumerAndIter();
        }
    };

    public void start() {
        try {
            LOGGER.info("Starting Kafka Channel: " + this.getName());
            this.producer = new Producer(new ProducerConfig(this.kafkaConf));
            LOGGER.info("Topic = " + this.topic.get());
            this.topicCountMap.put(this.topic.get(), 1);
            this.counter.start();
            super.start();
        }
        catch (Exception e) {
            LOGGER.error("Could not start producer");
            throw new FlumeException("Unable to create Kafka Connections. Check whether Kafka Brokers are up and that the Flume agent can connect to it.", (Throwable)e);
        }
    }

    public void stop() {
        for (ConsumerAndIterator c : this.consumers) {
            try {
                this.decommissionConsumerAndIterator(c);
            }
            catch (Exception ex) {
                LOGGER.warn("Error while shutting down consumer.", (Throwable)ex);
            }
        }
        this.producer.close();
        this.counter.stop();
        super.stop();
        LOGGER.info("Kafka channel {} stopped. Metrics: {}", (Object)this.getName(), (Object)this.counter);
    }

    protected BasicTransactionSemantics createTransaction() {
        return new KafkaTransaction();
    }

    private synchronized ConsumerAndIterator createConsumerAndIter() {
        try {
            ConsumerConfig consumerConfig = new ConsumerConfig(this.kafkaConf);
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)consumerConfig);
            Map consumerMap = consumer.createMessageStreams(this.topicCountMap);
            List streamList = (List)consumerMap.get(this.topic.get());
            KafkaStream stream = (KafkaStream)streamList.remove(0);
            ConsumerAndIterator ret = new ConsumerAndIterator(consumer, (ConsumerIterator<byte[], byte[]>)stream.iterator(), this.channelUUID);
            this.consumers.add(ret);
            LOGGER.info("Created new consumer to connect to Kafka");
            return ret;
        }
        catch (Exception e) {
            throw new FlumeException("Unable to connect to Kafka", (Throwable)e);
        }
    }

    Properties getKafkaConf() {
        return this.kafkaConf;
    }

    public void configure(Context ctx) {
        String brokerList;
        String topicStr = ctx.getString("topic");
        if (topicStr == null || topicStr.isEmpty()) {
            topicStr = "flume-channel";
            LOGGER.info("Topic was not specified. Using " + topicStr + " as the topic.");
        }
        this.topic.set(topicStr);
        String groupId = ctx.getString("groupId");
        if (groupId == null || groupId.isEmpty()) {
            groupId = "flume";
            LOGGER.info("Group ID was not specified. Using " + groupId + " as the group id.");
        }
        if ((brokerList = ctx.getString("brokerList")) == null || brokerList.isEmpty()) {
            throw new ConfigurationException("Broker List must be specified");
        }
        String zkConnect = ctx.getString("zookeeperConnect");
        if (zkConnect == null || zkConnect.isEmpty()) {
            throw new ConfigurationException("Zookeeper Connection must be specified");
        }
        Long timeout = ctx.getLong("timeout", Long.valueOf("100"));
        this.kafkaConf.putAll((Map<?, ?>)ctx.getSubProperties("kafka."));
        this.kafkaConf.put("group.id", groupId);
        this.kafkaConf.put("metadata.broker.list", brokerList);
        this.kafkaConf.put("zookeeper.connect", zkConnect);
        this.kafkaConf.put("auto.commit.enable", String.valueOf(false));
        this.kafkaConf.put("consumer.timeout.ms", String.valueOf(timeout));
        this.kafkaConf.put("request.required.acks", "-1");
        LOGGER.info(this.kafkaConf.toString());
        this.parseAsFlumeEvent = ctx.getBoolean("parseAsFlumeEvent", Boolean.valueOf(true));
        boolean readSmallest = ctx.getBoolean("readSmallestOffset", Boolean.valueOf(false));
        if (this.parseAsFlumeEvent || readSmallest) {
            this.kafkaConf.put("auto.offset.reset", "smallest");
        }
        if (this.counter == null) {
            this.counter = new KafkaChannelCounter(this.getName());
        }
    }

    private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
        if (c.failedEvents.isEmpty()) {
            c.consumer.commitOffsets();
        }
        c.failedEvents.clear();
        c.consumer.shutdown();
    }

    @VisibleForTesting
    void registerThread() {
        this.consumerAndIter.get();
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
        HashMap<CharSequence, CharSequence> charSeqMap = new HashMap<CharSequence, CharSequence>();
        for (Map.Entry<String, String> entry : stringMap.entrySet()) {
            charSeqMap.put(entry.getKey(), entry.getValue());
        }
        return charSeqMap;
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
        HashMap<String, String> stringMap = new HashMap<String, String>();
        for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
            stringMap.put(entry.getKey().toString(), entry.getValue().toString());
        }
        return stringMap;
    }

    private class ConsumerAndIterator {
        final ConsumerConnector consumer;
        final ConsumerIterator<byte[], byte[]> iterator;
        final String uuid;
        final LinkedList<Event> failedEvents = new LinkedList();

        ConsumerAndIterator(ConsumerConnector consumerConnector, ConsumerIterator<byte[], byte[]> iterator, String uuid) {
            this.consumer = consumerConnector;
            this.iterator = iterator;
            this.uuid = uuid;
        }
    }

    private class KafkaTransaction
    extends BasicTransactionSemantics {
        private TransactionType type = TransactionType.NONE;
        private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
        private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent();
        private Optional<LinkedList<Event>> events = Optional.absent();
        private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
        private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
        private BinaryEncoder encoder = null;
        private BinaryDecoder decoder = null;
        private final String batchUUID = UUID.randomUUID().toString();
        private boolean eventTaken = false;

        private KafkaTransaction() {
        }

        protected void doPut(Event event) throws InterruptedException {
            this.type = TransactionType.PUT;
            if (!this.serializedEvents.isPresent()) {
                this.serializedEvents = Optional.of(new LinkedList());
            }
            try {
                if (!this.tempOutStream.isPresent()) {
                    this.tempOutStream = Optional.of((Object)new ByteArrayOutputStream());
                }
                if (!this.writer.isPresent()) {
                    this.writer = Optional.of((Object)new SpecificDatumWriter(AvroFlumeEvent.class));
                }
                ((ByteArrayOutputStream)this.tempOutStream.get()).reset();
                AvroFlumeEvent e = new AvroFlumeEvent(KafkaChannel.toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
                this.encoder = EncoderFactory.get().directBinaryEncoder((OutputStream)this.tempOutStream.get(), this.encoder);
                ((SpecificDatumWriter)this.writer.get()).write((Object)e, (Encoder)this.encoder);
                ((LinkedList)this.serializedEvents.get()).add(((ByteArrayOutputStream)this.tempOutStream.get()).toByteArray());
            }
            catch (Exception e) {
                throw new ChannelException("Error while serializing event", (Throwable)e);
            }
        }

        protected Event doTake() throws InterruptedException {
            Event e;
            this.type = TransactionType.TAKE;
            try {
                if (!((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).uuid.equals(KafkaChannel.this.channelUUID)) {
                    LOGGER.info("UUID mismatch, creating new consumer");
                    KafkaChannel.this.decommissionConsumerAndIterator((ConsumerAndIterator)KafkaChannel.this.consumerAndIter.get());
                    KafkaChannel.this.consumerAndIter.remove();
                }
            }
            catch (Exception ex) {
                LOGGER.warn("Error while shutting down consumer", (Throwable)ex);
            }
            if (!this.events.isPresent()) {
                this.events = Optional.of(new LinkedList());
            }
            if (!((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).failedEvents.isEmpty()) {
                e = ((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).failedEvents.removeFirst();
            } else {
                try {
                    ConsumerIterator<byte[], byte[]> it = ((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).iterator;
                    long startTime = System.nanoTime();
                    it.hasNext();
                    long endTime = System.nanoTime();
                    KafkaChannel.this.counter.addToKafkaEventGetTimer((endTime - startTime) / 1000000L);
                    if (KafkaChannel.this.parseAsFlumeEvent) {
                        ByteArrayInputStream in = new ByteArrayInputStream((byte[])it.next().message());
                        this.decoder = DecoderFactory.get().directBinaryDecoder((InputStream)in, this.decoder);
                        if (!this.reader.isPresent()) {
                            this.reader = Optional.of((Object)new SpecificDatumReader(AvroFlumeEvent.class));
                        }
                        AvroFlumeEvent event = (AvroFlumeEvent)((SpecificDatumReader)this.reader.get()).read(null, (Decoder)this.decoder);
                        e = EventBuilder.withBody((byte[])event.getBody().array(), (Map)KafkaChannel.toStringMap(event.getHeaders()));
                    } else {
                        e = EventBuilder.withBody((byte[])((byte[])it.next().message()), (Map)Collections.EMPTY_MAP);
                    }
                }
                catch (ConsumerTimeoutException ex) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Timed out while waiting for data to come from Kafka", (Throwable)ex);
                    }
                    return null;
                }
                catch (Exception ex) {
                    LOGGER.warn("Error while getting events from Kafka", (Throwable)ex);
                    throw new ChannelException("Error while getting events from Kafka", (Throwable)ex);
                }
            }
            this.eventTaken = true;
            ((LinkedList)this.events.get()).add(e);
            return e;
        }

        protected void doCommit() throws InterruptedException {
            if (this.type.equals((Object)TransactionType.NONE)) {
                return;
            }
            if (this.type.equals((Object)TransactionType.PUT)) {
                try {
                    ArrayList<KeyedMessage> messages = new ArrayList<KeyedMessage>(((LinkedList)this.serializedEvents.get()).size());
                    for (byte[] event : (LinkedList)this.serializedEvents.get()) {
                        messages.add(new KeyedMessage((String)KafkaChannel.this.topic.get(), null, (Object)this.batchUUID, (Object)event));
                    }
                    long startTime = System.nanoTime();
                    KafkaChannel.this.producer.send(messages);
                    long endTime = System.nanoTime();
                    KafkaChannel.this.counter.addToKafkaEventSendTimer((endTime - startTime) / 1000000L);
                    KafkaChannel.this.counter.addToEventPutSuccessCount(Long.valueOf(messages.size()).longValue());
                    ((LinkedList)this.serializedEvents.get()).clear();
                }
                catch (Exception ex) {
                    LOGGER.warn("Sending events to Kafka failed", (Throwable)ex);
                    throw new ChannelException("Commit failed as send to Kafka failed", (Throwable)ex);
                }
            } else {
                if (((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).failedEvents.isEmpty() && this.eventTaken) {
                    long startTime = System.nanoTime();
                    ((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).consumer.commitOffsets();
                    long endTime = System.nanoTime();
                    KafkaChannel.this.counter.addToKafkaCommitTimer((endTime - startTime) / 1000000L);
                }
                KafkaChannel.this.counter.addToEventTakeSuccessCount(Long.valueOf(((LinkedList)this.events.get()).size()).longValue());
                ((LinkedList)this.events.get()).clear();
            }
        }

        protected void doRollback() throws InterruptedException {
            if (this.type.equals((Object)TransactionType.NONE)) {
                return;
            }
            if (this.type.equals((Object)TransactionType.PUT)) {
                ((LinkedList)this.serializedEvents.get()).clear();
            } else {
                KafkaChannel.this.counter.addToRollbackCounter(Long.valueOf(((LinkedList)this.events.get()).size()).longValue());
                ((ConsumerAndIterator)((KafkaChannel)KafkaChannel.this).consumerAndIter.get()).failedEvents.addAll((Collection)this.events.get());
                ((LinkedList)this.events.get()).clear();
            }
        }
    }

    private static enum TransactionType {
        PUT,
        TAKE,
        NONE;

    }
}

