package org.apache.htrace.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.htrace.Transport;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.shaded.commons.logging.Log;
import org.apache.htrace.shaded.commons.logging.LogFactory;
import org.apache.htrace.shaded.kafka.javaapi.producer.Producer;
import org.apache.htrace.shaded.kafka.producer.KeyedMessage;
import org.apache.htrace.shaded.kafka.producer.ProducerConfig;

/* loaded from: input_file:org/apache/htrace/impl/KafkaTransport.class */
public class KafkaTransport implements Transport {
    private static final Log LOG = LogFactory.getLog(KafkaTransport.class);
    private static final String DEFAULT_TOPIC = "zipkin";
    public static final String TOPIC_KEY = "zipkin.kafka.topic";
    Producer<byte[], byte[]> producer;
    private boolean isOpen = false;
    private String topic;

    @Override // org.apache.htrace.Transport
    public void open(HTraceConfiguration hTraceConfiguration) throws IOException, IllegalStateException {
        if (isOpen()) {
            LOG.warn("Attempted to open an already opened transport");
            return;
        }
        this.topic = hTraceConfiguration.get(TOPIC_KEY, "zipkin");
        this.producer = newProducer(hTraceConfiguration);
        this.isOpen = true;
    }

    @Override // org.apache.htrace.Transport
    public boolean isOpen() {
        return this.isOpen;
    }

    @Override // org.apache.htrace.Transport
    public void send(List<byte[]> list) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new KeyedMessage(this.topic, it.next()));
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("sending " + arrayList.size() + " entries");
        }
        this.producer.send(arrayList);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (!this.isOpen) {
            LOG.warn("Attempted to close already closed transport");
        } else {
            this.producer.close();
            this.isOpen = false;
        }
    }

    public Producer<byte[], byte[]> newProducer(HTraceConfiguration hTraceConfiguration) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", hTraceConfiguration.get("zipkin.kafka.metadata.broker.list", "localhost:9092"));
        properties.put("request.required.acks", hTraceConfiguration.get("zipkin.kafka.request.required.acks", "0"));
        properties.put("producer.type", hTraceConfiguration.get("zipkin.kafka.producer.type", "async"));
        properties.put("serializer.class", hTraceConfiguration.get("zipkin.kafka.serializer.class", "org.apache.htrace.shaded.kafka.serializer.DefaultEncoder"));
        properties.put("compression.codec", hTraceConfiguration.get("zipkin.kafka.compression.codec", "1"));
        Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(properties));
        LOG.info("Connected to Kafka transport \n" + properties);
        return producer;
    }
}
