package org.apache.drill.exec.store.kafka;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Suite.class)
@Category({KafkaStorageTest.class, SlowTest.class})
@Suite.SuiteClasses({KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/TestKafkaSuit.class */
public class TestKafkaSuit {
    private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
    public static EmbeddedKafkaCluster embeddedKafkaCluster;
    private static ZkClient zkClient;
    static final int NUM_JSON_MSG = 10;
    static final int CONN_TIMEOUT = 8000;
    static final int SESSION_TIMEOUT = 10000;
    static String kafkaBroker;
    private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
    private static volatile AtomicInteger initCount = new AtomicInteger(0);
    private static volatile boolean runningSuite = false;

    @BeforeClass
    public static void initKafka() throws Exception {
        synchronized (TestKafkaSuit.class) {
            if (initCount.get() == 0) {
                ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
                System.setProperty("java.security.auth.login.config", ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
                embeddedKafkaCluster = new EmbeddedKafkaCluster();
                Properties properties = new Properties();
                zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
                ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
                AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, properties, RackAwareMode$Disabled$.MODULE$);
                logger.info("Topic Metadata: " + AdminUtils.fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils));
                new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG);
            }
            initCount.incrementAndGet();
            runningSuite = true;
        }
        logger.info("Initialized Embedded Zookeeper and Kafka");
    }

    public static boolean isRunningSuite() {
        return runningSuite;
    }

    @AfterClass
    public static void tearDownCluster() throws Exception {
        synchronized (TestKafkaSuit.class) {
            if (initCount.decrementAndGet() == 0) {
                if (zkClient != null) {
                    zkClient.close();
                }
                if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
                    embeddedKafkaCluster.shutDownCluster();
                }
            }
        }
    }
}
