/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka.cluster;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
import org.apache.drill.exec.store.kafka.TestQueryConstants;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class EmbeddedKafkaCluster
implements TestQueryConstants {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private List<KafkaServer> brokers;
    private ZookeeperHelper zkHelper;
    private KafkaAsyncCloser closer;
    private final Properties props = new Properties();

    public EmbeddedKafkaCluster() throws IOException {
        this(new Properties());
    }

    public EmbeddedKafkaCluster(Properties props) throws IOException {
        this(props, 1);
    }

    public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IOException {
        this.props.putAll((Map<?, ?>)baseProps);
        this.zkHelper = new ZookeeperHelper();
        this.zkHelper.startZookeeper(1);
        this.brokers = new ArrayList<KafkaServer>(numberOfBrokers);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < numberOfBrokers; ++i) {
            if (i != 0) {
                sb.append(",");
            }
            int ephemeralBrokerPort = this.getEphemeralPort();
            sb.append("127.0.0.1").append(":").append(ephemeralBrokerPort);
            this.addBroker(this.props, i, ephemeralBrokerPort);
        }
        this.props.put("metadata.broker.list", sb.toString());
        this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        logger.info("Initialized Kafka Server");
        this.closer = new KafkaAsyncCloser();
    }

    private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)props);
        properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
        properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
        properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
        properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
        properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
        properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
        properties.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
        properties.put(KafkaConfig.HostNameProp(), "127.0.0.1");
        properties.put(KafkaConfig.AdvertisedHostNameProp(), "127.0.0.1");
        properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
        properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort));
        properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
        properties.put(KafkaConfig.LogDirsProp(), this.getTemporaryDir().getAbsolutePath());
        properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
        this.brokers.add(EmbeddedKafkaCluster.getBroker(properties));
    }

    private static KafkaServer getBroker(Properties properties) {
        KafkaServer broker = new KafkaServer(new KafkaConfig((Map)properties), Time.SYSTEM, Option.apply((Object)"kafka"), false);
        broker.startup();
        return broker;
    }

    public void shutDownCluster() {
        this.closer.close();
        this.closer = null;
        if (this.brokers != null) {
            this.brokers.forEach(KafkaServer::shutdown);
            this.brokers = null;
        }
        if (this.zkHelper != null) {
            this.zkHelper.stopZookeeper();
            this.zkHelper = null;
        }
    }

    public void shutDownBroker(int brokerId) {
        this.brokers.stream().filter(broker -> Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) == brokerId).findAny().ifPresent(KafkaServer::shutdown);
    }

    public Properties getProps() {
        Properties tmpProps = new Properties();
        tmpProps.putAll((Map<?, ?>)this.props);
        return tmpProps;
    }

    public List<KafkaServer> getBrokers() {
        return this.brokers;
    }

    public void setBrokers(List<KafkaServer> brokers) {
        this.brokers = brokers;
    }

    public ZookeeperHelper getZkServer() {
        return this.zkHelper;
    }

    public String getKafkaBrokerList() {
        return this.brokers.stream().map(KafkaServer::config).map(serverConfig -> serverConfig.hostName() + ":" + serverConfig.port()).collect(Collectors.joining(","));
    }

    public void registerToClose(AutoCloseable autoCloseable) {
        this.closer.close(autoCloseable);
    }

    private int getEphemeralPort() throws IOException {
        try (ServerSocket socket = new ServerSocket(0);){
            int n = socket.getLocalPort();
            return n;
        }
    }

    private File getTemporaryDir() {
        File file = new File(System.getProperty("java.io.tmpdir"), "zk_tmp" + System.nanoTime());
        if (!file.mkdir()) {
            logger.error("Failed to create temp Dir");
            throw new RuntimeException("Failed to create temp Dir");
        }
        return file;
    }
}

