package org.apache.flume.sink.kafka.v09;

import com.google.common.base.Throwables;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/kafka/v09/KafkaSink.class */
public class KafkaSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    private final Properties kafkaProps = new Properties();
    private KafkaProducer<String, byte[]> producer;
    private String topic;
    private int batchSize;
    private List<Future<RecordMetadata>> kafkaFutures;
    private KafkaSinkCounter counter;

    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        try {
            try {
                long j = 0;
                transaction = channel.getTransaction();
                transaction.begin();
                this.kafkaFutures.clear();
                long nanoTime = System.nanoTime();
                while (true) {
                    if (j >= this.batchSize) {
                        break;
                    }
                    Event take = channel.take();
                    if (take != null) {
                        byte[] body = take.getBody();
                        Map headers = take.getHeaders();
                        String str = (String) headers.get(KafkaSinkConstants.TOPIC_HEADER);
                        if (str == null) {
                            str = this.topic;
                        }
                        String str2 = (String) headers.get(KafkaSinkConstants.KEY_HEADER);
                        if (logger.isDebugEnabled()) {
                            logger.debug("{Event} " + str + " : " + str2 + " : " + new String(body, "UTF-8"));
                            logger.debug("event #{}", Long.valueOf(j));
                        }
                        this.kafkaFutures.add(this.producer.send(new ProducerRecord(str, str2, body), new SinkCallback(System.currentTimeMillis())));
                        j++;
                    } else if (j == 0) {
                        status = Sink.Status.BACKOFF;
                        this.counter.incrementBatchEmptyCount();
                    } else {
                        this.counter.incrementBatchUnderflowCount();
                    }
                }
                this.producer.flush();
                if (j > 0) {
                    Iterator<Future<RecordMetadata>> it = this.kafkaFutures.iterator();
                    while (it.hasNext()) {
                        it.next().get();
                    }
                    this.counter.addToKafkaEventSendTimer((System.nanoTime() - nanoTime) / 1000000);
                    this.counter.addToEventDrainSuccessCount(Long.valueOf(this.kafkaFutures.size()).longValue());
                }
                transaction.commit();
                if (transaction != null) {
                    transaction.close();
                }
                return status;
            } catch (Exception e) {
                logger.error("Failed to publish events", e);
                Sink.Status status2 = Sink.Status.BACKOFF;
                if (transaction != null) {
                    try {
                        this.kafkaFutures.clear();
                        transaction.rollback();
                        this.counter.incrementRollbackCount();
                    } catch (Exception e2) {
                        logger.error("Transaction rollback failed", e2);
                        throw Throwables.propagate(e2);
                    }
                }
                throw new EventDeliveryException("Failed to publish events", e);
            }
        } catch (Throwable th) {
            if (transaction != null) {
                transaction.close();
            }
            throw th;
        }
    }

    public synchronized void start() {
        this.producer = new KafkaProducer<>(this.kafkaProps);
        this.counter.start();
        super.start();
    }

    public synchronized void stop() {
        this.producer.close();
        this.counter.stop();
        logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), this.counter);
        super.stop();
    }

    public void configure(Context context) {
        String string = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
        if (string == null || string.isEmpty()) {
            string = KafkaSinkConstants.DEFAULT_TOPIC;
            logger.warn("Topic was not specified. Using {} as the topic.", string);
        } else {
            logger.info("Using the static topic {}. This may be overridden by event headers", string);
        }
        this.topic = string;
        this.batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, 100).intValue();
        if (logger.isDebugEnabled()) {
            logger.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        }
        this.kafkaFutures = new LinkedList();
        setProducerProps(context, context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG));
        if (logger.isDebugEnabled()) {
            logger.debug("Kafka producer properties: {}", this.kafkaProps);
        }
        if (this.counter == null) {
            this.counter = new KafkaSinkCounter(getName());
        }
    }

    private void setProducerProps(Context context, String str) {
        this.kafkaProps.put("acks", KafkaSinkConstants.DEFAULT_ACKS);
        this.kafkaProps.put("key.serializer", KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
        this.kafkaProps.put("value.serializer", KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER);
        this.kafkaProps.putAll(context.getSubProperties(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX));
        if (str != null) {
            this.kafkaProps.put("bootstrap.servers", str);
        }
        logger.info("Producer properties: {}", this.kafkaProps.toString());
    }

    protected Properties getKafkaProps() {
        return this.kafkaProps;
    }
}
