package org.apache.sqoop.connector.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.kafka.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kafka.configuration.ToJobConfiguration;
import org.apache.sqoop.error.code.KafkaConnectorErrors;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;

/* loaded from: input_file:WEB-INF/lib/sqoop-connector-kafka-1.99.6-mapr-1507.jar:org/apache/sqoop/connector/kafka/KafkaLoader.class */
public class KafkaLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
    private static final Logger LOG = Logger.getLogger(KafkaLoader.class);
    private Producer producer;
    private List<KeyedMessage<String, String>> messageList = new ArrayList(100);
    private long rowsWritten = 0;

    @Override // org.apache.sqoop.job.etl.Loader
    public void load(LoaderContext loaderContext, LinkConfiguration linkConfiguration, ToJobConfiguration toJobConfiguration) throws Exception {
        this.producer = getProducer(linkConfiguration);
        LOG.info("got producer");
        String str = toJobConfiguration.toJobConfig.topic;
        LOG.info("topic is:" + str);
        String uuid = UUID.randomUUID().toString();
        while (true) {
            String readTextRecord = loaderContext.getDataReader().readTextRecord();
            if (readTextRecord == null) {
                break;
            }
            this.messageList.add(new KeyedMessage<>(str, null, uuid, readTextRecord));
            if (this.messageList.size() >= 100) {
                sendToKafka(this.messageList);
            }
            this.rowsWritten++;
        }
        if (this.messageList.size() > 0) {
            sendToKafka(this.messageList);
        }
        this.producer.close();
    }

    private void sendToKafka(List<KeyedMessage<String, String>> list) {
        try {
            this.producer.send(list);
            list.clear();
        } catch (Exception e) {
            throw new SqoopException(KafkaConnectorErrors.KAFKA_CONNECTOR_0001);
        }
    }

    Producer getProducer(LinkConfiguration linkConfiguration) {
        Properties generateDefaultKafkaProps = generateDefaultKafkaProps();
        generateDefaultKafkaProps.put(KafkaConstants.BROKER_LIST_KEY, linkConfiguration.linkConfig.brokerList);
        return new Producer(new ProducerConfig(generateDefaultKafkaProps));
    }

    private Properties generateDefaultKafkaProps() {
        Properties properties = new Properties();
        properties.put(KafkaConstants.MESSAGE_SERIALIZER_KEY, "kafka.serializer.StringEncoder");
        properties.put(KafkaConstants.KEY_SERIALIZER_KEY, "kafka.serializer.StringEncoder");
        properties.put(KafkaConstants.REQUIRED_ACKS_KEY, KafkaConstants.DEFAULT_REQUIRED_ACKS);
        properties.put(KafkaConstants.PRODUCER_TYPE, KafkaConstants.DEFAULT_PRODUCER_TYPE);
        return properties;
    }

    @Override // org.apache.sqoop.job.etl.Loader
    public long getRowsWritten() {
        return this.rowsWritten;
    }
}
