package org.apache.flume.sink.kafka;

import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/kafka/KafkaSink.class */
public class KafkaSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    public static final String KEY_HDR = "key";
    public static final String TOPIC_HDR = "topic";
    private Properties kafkaProps;
    private Producer<String, byte[]> producer;
    private String topic;
    private int batchSize;
    private List<KeyedMessage<String, byte[]>> messageList;
    private KafkaSinkCounter counter;

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        try {
            try {
                long j = 0;
                transaction = channel.getTransaction();
                transaction.begin();
                this.messageList.clear();
                while (j < this.batchSize && (take = channel.take()) != null) {
                    byte[] body = take.getBody();
                    Map headers = take.getHeaders();
                    String str = (String) headers.get("topic");
                    String str2 = str;
                    if (str == null) {
                        str2 = this.topic;
                    }
                    String str3 = (String) headers.get(KEY_HDR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("{Event} " + str2 + " : " + str3 + " : " + new String(body, "UTF-8"));
                        logger.debug("event #{}", Long.valueOf(j));
                    }
                    this.messageList.add(new KeyedMessage<>(str2, str3, body));
                    j++;
                }
                if (j > 0) {
                    long nanoTime = System.nanoTime();
                    this.producer.send(this.messageList);
                    this.counter.addToKafkaEventSendTimer((System.nanoTime() - nanoTime) / 1000000);
                    this.counter.addToEventDrainSuccessCount(Long.valueOf(this.messageList.size()).longValue());
                }
                transaction.commit();
                if (transaction != null) {
                    transaction.close();
                }
                return status;
            } catch (Exception e) {
                logger.error("Failed to publish events", e);
                Sink.Status status2 = Sink.Status.BACKOFF;
                if (transaction != null) {
                    try {
                        transaction.rollback();
                        this.counter.incrementRollbackCount();
                    } catch (Exception e2) {
                        logger.error("Transaction rollback failed", e2);
                        throw Throwables.propagate(e2);
                    }
                }
                throw new EventDeliveryException("Failed to publish events", e);
            }
        } catch (Throwable th) {
            if (transaction != null) {
                transaction.close();
            }
            throw th;
        }
    }

    public synchronized void start() {
        this.producer = new Producer<>(new ProducerConfig(this.kafkaProps));
        this.counter.start();
        super.start();
    }

    public synchronized void stop() {
        this.producer.close();
        this.counter.stop();
        logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), this.counter);
        super.stop();
    }

    public void configure(Context context) {
        this.batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, 100).intValue();
        this.messageList = new ArrayList(this.batchSize);
        logger.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        this.topic = context.getString("topic", KafkaSinkConstants.DEFAULT_TOPIC);
        if (this.topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
            logger.warn("The Property 'topic' is not set. Using the default topic name: default-flume-topic");
        } else {
            logger.info("Using the static topic: " + this.topic + " this may be over-ridden by event headers");
        }
        this.kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
        if (logger.isDebugEnabled()) {
            logger.debug("Kafka producer properties: " + this.kafkaProps);
        }
        if (this.counter == null) {
            this.counter = new KafkaSinkCounter(getName());
        }
    }
}
