package org.apache.sqoop.common.test.kafka;

import java.io.File;
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.sqoop.common.test.utils.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sqoop/common/test/kafka/KafkaLocalRunner.class */
public class KafkaLocalRunner extends KafkaRunnerBase {
    public KafkaServerStartable kafka;
    public ZooKeeperLocal zookeeperServer;
    private KafkaConfig kafkaConfig;
    private int kafkaLocalPort;
    private int zkLocalPort;
    private static final Logger logger = LoggerFactory.getLogger(KafkaLocalRunner.class);

    public KafkaLocalRunner() throws IOException, InterruptedException {
        this.kafkaLocalPort = 9022;
        this.zkLocalPort = 2188;
        this.kafkaLocalPort = NetworkUtils.findAvailablePort();
        this.zkLocalPort = NetworkUtils.findAvailablePort();
        logger.info("Starting kafka server with kafka port " + this.kafkaLocalPort + " and zookeeper port " + this.zkLocalPort);
        try {
            this.zookeeperServer = new ZooKeeperLocal(this.zkLocalPort);
            logger.info("ZooKeeper instance is successfully started on port " + this.zkLocalPort);
            this.kafkaConfig = new KafkaConfig(getKafkaProperties());
            this.kafka = new KafkaServerStartable(this.kafkaConfig);
            logger.info("Kafka Server is successfully started on port " + this.kafkaLocalPort);
        } catch (Exception e) {
            logger.error("Error starting the Kafka Server.", e);
        }
    }

    Properties getKafkaProperties() {
        Properties properties = new Properties();
        properties.put("broker.id", "0");
        properties.put("port", Integer.toString(this.kafkaLocalPort));
        properties.put("log.dirs", "target/kafka-logs");
        properties.put("num.partitions", "1");
        properties.put("zookeeper.connect", this.zookeeperServer.getConnectString());
        return properties;
    }

    @Override // org.apache.sqoop.common.test.kafka.KafkaRunnerBase
    public void start() throws Exception {
        this.kafka.startup();
    }

    @Override // org.apache.sqoop.common.test.kafka.KafkaRunnerBase
    public void stop() throws IOException {
        this.kafka.shutdown();
        this.zookeeperServer.stopZookeeper();
        FileUtils.deleteDirectory(new File((String) this.kafkaConfig.logDirs().head()).getAbsoluteFile());
    }

    @Override // org.apache.sqoop.common.test.kafka.KafkaRunnerBase
    public String getZkConnectionString() {
        return this.zookeeperServer.getConnectString();
    }

    @Override // org.apache.sqoop.common.test.kafka.KafkaRunnerBase
    public String getKafkaUrl() {
        return "localhost:" + this.kafkaLocalPort;
    }
}
