/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka.cluster;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.store.kafka.TestQueryConstants;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafkaCluster
implements TestQueryConstants {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private List<KafkaServerStartable> brokers;
    private final ZookeeperHelper zkHelper;
    private final Properties props = new Properties();

    public EmbeddedKafkaCluster() throws IOException {
        this(new Properties());
    }

    public EmbeddedKafkaCluster(Properties props) throws IOException {
        this(props, 1);
    }

    public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException {
        this.props.putAll((Map<?, ?>)basePorps);
        this.zkHelper = new ZookeeperHelper();
        this.zkHelper.startZookeeper(1);
        this.brokers = new ArrayList<KafkaServerStartable>(numberOfBrokers);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < numberOfBrokers; ++i) {
            if (i != 0) {
                sb.append(",");
            }
            int ephemeralBrokerPort = this.getEphemeralPort();
            sb.append("127.0.0.1:" + ephemeralBrokerPort);
            this.addBroker(this.props, i, ephemeralBrokerPort);
        }
        this.props.put("metadata.broker.list", sb.toString());
        this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        logger.info("Initialized Kafka Server");
    }

    private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)props);
        properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
        properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
        properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
        properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
        properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE);
        properties.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
        properties.put(KafkaConfig.HostNameProp(), String.valueOf("127.0.0.1"));
        properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf("127.0.0.1"));
        properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
        properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE);
        properties.put(KafkaConfig.LogDirsProp(), this.getTemporaryDir().getAbsolutePath());
        properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
        this.brokers.add(EmbeddedKafkaCluster.getBroker(properties));
    }

    private static KafkaServerStartable getBroker(Properties properties) {
        KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig((Map)properties));
        broker.startup();
        return broker;
    }

    public void shutDownCluster() throws IOException {
        Level level = LogManager.getLogger((String)"kafka").getLevel();
        LogManager.getLogger((String)"kafka").setLevel(Level.ERROR);
        for (KafkaServerStartable broker : this.brokers) {
            broker.shutdown();
        }
        LogManager.getLogger((String)"kafka").setLevel(level);
        this.zkHelper.stopZookeeper();
    }

    public void shutDownBroker(int brokerId) {
        for (KafkaServerStartable broker : this.brokers) {
            if (Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp())) != brokerId) continue;
            broker.shutdown();
            return;
        }
    }

    public Properties getProps() {
        Properties tmpProps = new Properties();
        tmpProps.putAll((Map<?, ?>)this.props);
        return tmpProps;
    }

    public List<KafkaServerStartable> getBrokers() {
        return this.brokers;
    }

    public void setBrokers(List<KafkaServerStartable> brokers) {
        this.brokers = brokers;
    }

    public ZookeeperHelper getZkServer() {
        return this.zkHelper;
    }

    public String getKafkaBrokerList() {
        StringBuilder sb = new StringBuilder();
        for (KafkaServerStartable broker : this.brokers) {
            KafkaConfig serverConfig = broker.staticServerConfig();
            sb.append(serverConfig.hostName() + ":" + serverConfig.port());
            sb.append(",");
        }
        return sb.toString().substring(0, sb.toString().length() - 1);
    }

    private int getEphemeralPort() throws IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }

    private File getTemporaryDir() {
        File file = new File(System.getProperty("java.io.tmpdir"), "zk_tmp" + System.nanoTime());
        if (!file.mkdir()) {
            logger.error("Failed to create temp Dir");
            throw new RuntimeException("Failed to create temp Dir");
        }
        return file;
    }
}

