package org.apache.kafka.streams.integration.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import kafka.server.ConfigType;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    private static final int TOPIC_CREATION_TIMEOUT = 30000;
    private static final int TOPIC_DELETION_TIMEOUT = 30000;
    private EmbeddedZookeeper zookeeper;
    private final KafkaEmbedded[] brokers;
    private final Properties brokerConfig;
    private final List<Properties> brokerConfigOverrides;
    public final MockTime time;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster$TopicsDeletedCondition.class */
    public final class TopicsDeletedCondition implements TestCondition {
        final Set<String> deletedTopics;

        private TopicsDeletedCondition(String... strArr) {
            this.deletedTopics = new HashSet();
            Collections.addAll(this.deletedTopics, strArr);
        }

        private TopicsDeletedCondition(Collection<String> collection) {
            this.deletedTopics = new HashSet();
            this.deletedTopics.addAll(collection);
        }

        public boolean conditionMet() {
            return !EmbeddedKafkaCluster.this.getAllTopicsInCluster().removeAll(this.deletedTopics);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster$TopicsRemainingCondition.class */
    private final class TopicsRemainingCondition implements TestCondition {
        final Set<String> remainingTopics;

        private TopicsRemainingCondition(String... strArr) {
            this.remainingTopics = new HashSet();
            Collections.addAll(this.remainingTopics, strArr);
        }

        public boolean conditionMet() {
            return EmbeddedKafkaCluster.this.getAllTopicsInCluster().equals(this.remainingTopics);
        }
    }

    public EmbeddedKafkaCluster(int i) {
        this(i, new Properties());
    }

    public EmbeddedKafkaCluster(int i, Properties properties) {
        this(i, properties, System.currentTimeMillis());
    }

    public EmbeddedKafkaCluster(int i, Properties properties, long j) {
        this(i, properties, Collections.emptyList(), j);
    }

    public EmbeddedKafkaCluster(int i, Properties properties, List<Properties> list) {
        this(i, properties, list, System.currentTimeMillis());
    }

    public EmbeddedKafkaCluster(int i, Properties properties, List<Properties> list, long j) {
        this(i, properties, list, j, System.nanoTime());
    }

    public EmbeddedKafkaCluster(int i, Properties properties, List<Properties> list, long j, long j2) {
        this.zookeeper = null;
        if (!list.isEmpty() && list.size() != i) {
            throw new IllegalArgumentException("Size of brokerConfigOverrides " + list.size() + " must match broker number " + i);
        }
        this.brokers = new KafkaEmbedded[i];
        this.brokerConfig = properties;
        this.time = new MockTime(j, j2);
        this.brokerConfigOverrides = list;
    }

    public void start() throws IOException {
        log.debug("Initiating embedded Kafka cluster startup");
        log.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        log.debug("ZooKeeper instance is running at {}", zKConnectString());
        this.brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
        putIfAbsent(this.brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:0");
        putIfAbsent(this.brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
        putIfAbsent(this.brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2097152L);
        putIfAbsent(this.brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        putIfAbsent(this.brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        putIfAbsent(this.brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);
        putIfAbsent(this.brokerConfig, KafkaConfig.OffsetsTopicPartitionsProp(), 5);
        putIfAbsent(this.brokerConfig, KafkaConfig.TransactionsTopicPartitionsProp(), 5);
        putIfAbsent(this.brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), true);
        for (int i = DEFAULT_BROKER_PORT; i < this.brokers.length; i++) {
            this.brokerConfig.put(KafkaConfig.BrokerIdProp(), Integer.valueOf(i));
            log.debug("Starting a Kafka instance on {} ...", this.brokerConfig.get(KafkaConfig.ListenersProp()));
            Properties properties = new Properties();
            properties.putAll(this.brokerConfig);
            if (this.brokerConfigOverrides != null && this.brokerConfigOverrides.size() > i) {
                properties.putAll(this.brokerConfigOverrides.get(i));
            }
            this.brokers[i] = new KafkaEmbedded(properties, this.time);
            log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", this.brokers[i].brokerList(), this.brokers[i].zookeeperConnect());
        }
    }

    private void putIfAbsent(Properties properties, String str, Object obj) {
        if (properties.containsKey(str)) {
            return;
        }
        this.brokerConfig.put(str, obj);
    }

    public void stop() {
        if (this.brokers.length > 1) {
            Set<String> allTopicsInCluster = getAllTopicsInCluster();
            if (!allTopicsInCluster.isEmpty()) {
                try {
                    Admin createAdminClient = this.brokers[DEFAULT_BROKER_PORT].createAdminClient();
                    try {
                        createAdminClient.deleteTopics(allTopicsInCluster).all().get();
                        if (createAdminClient != null) {
                            createAdminClient.close();
                        }
                    } catch (Throwable th) {
                        if (createAdminClient != null) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (InterruptedException e) {
                    log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e);
                    throw new RuntimeException(e);
                } catch (RuntimeException | ExecutionException e2) {
                    log.warn("Couldn't delete all topics before stopping brokers", e2);
                }
            }
        }
        KafkaEmbedded[] kafkaEmbeddedArr = this.brokers;
        int length = kafkaEmbeddedArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            kafkaEmbeddedArr[i].stopAsync();
        }
        KafkaEmbedded[] kafkaEmbeddedArr2 = this.brokers;
        int length2 = kafkaEmbeddedArr2.length;
        for (int i2 = DEFAULT_BROKER_PORT; i2 < length2; i2++) {
            kafkaEmbeddedArr2[i2].awaitStoppedAndPurge();
        }
        this.zookeeper.shutdown();
    }

    public String zKConnectString() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public String bootstrapServers() {
        return this.brokers[DEFAULT_BROKER_PORT].brokerList();
    }

    public void createTopics(String... strArr) throws InterruptedException {
        int length = strArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            createTopic(strArr[i], 1, 1, Collections.emptyMap());
        }
    }

    public void createTopic(String str) throws InterruptedException {
        createTopic(str, 1, 1, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2) throws InterruptedException {
        createTopic(str, i, i2, Collections.emptyMap());
    }

    public void createTopic(String str, int i, int i2, Map<String, String> map) throws InterruptedException {
        this.brokers[DEFAULT_BROKER_PORT].createTopic(str, i, i2, map);
        ArrayList arrayList = new ArrayList();
        for (int i3 = DEFAULT_BROKER_PORT; i3 < i; i3++) {
            arrayList.add(new TopicPartition(str, i3));
        }
        IntegrationTestUtils.waitForTopicPartitions(brokers(), arrayList, 30000L);
    }

    public void deleteTopic(String str) throws InterruptedException {
        deleteTopicsAndWait(-1L, str);
    }

    public void deleteTopicAndWait(String str) throws InterruptedException {
        deleteTopicsAndWait(30000L, str);
    }

    public void deleteTopics(String... strArr) throws InterruptedException {
        deleteTopicsAndWait(-1L, strArr);
    }

    public void deleteTopicsAndWait(String... strArr) throws InterruptedException {
        deleteTopicsAndWait(30000L, strArr);
    }

    public void deleteTopicsAndWait(long j, String... strArr) throws InterruptedException {
        int length = strArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            try {
                this.brokers[DEFAULT_BROKER_PORT].deleteTopic(strArr[i]);
            } catch (UnknownTopicOrPartitionException e) {
            }
        }
        if (j > 0) {
            TestUtils.waitForCondition(new TopicsDeletedCondition(strArr), j, "Topics not deleted after " + j + " milli seconds.");
        }
    }

    public void deleteAllTopicsAndWait(long j) throws InterruptedException {
        Set<String> allTopicsInCluster = getAllTopicsInCluster();
        Iterator<String> it = allTopicsInCluster.iterator();
        while (it.hasNext()) {
            try {
                this.brokers[DEFAULT_BROKER_PORT].deleteTopic(it.next());
            } catch (UnknownTopicOrPartitionException e) {
            }
        }
        if (j > 0) {
            TestUtils.waitForCondition(new TopicsDeletedCondition(allTopicsInCluster), j, "Topics not deleted after " + j + " milli seconds.");
        }
    }

    public void waitForRemainingTopics(long j, String... strArr) throws InterruptedException {
        TestUtils.waitForCondition(new TopicsRemainingCondition(strArr), j, "Topics are not expected after " + j + " milli seconds.");
    }

    private List<KafkaServer> brokers() {
        ArrayList arrayList = new ArrayList();
        KafkaEmbedded[] kafkaEmbeddedArr = this.brokers;
        int length = kafkaEmbeddedArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            arrayList.add(kafkaEmbeddedArr[i].kafkaServer());
        }
        return arrayList;
    }

    public Properties getLogConfig(String str) {
        return this.brokers[DEFAULT_BROKER_PORT].kafkaServer().zkClient().getEntityConfigs(ConfigType.Topic(), str);
    }

    public Set<String> getAllTopicsInCluster() {
        scala.collection.Iterator it = this.brokers[DEFAULT_BROKER_PORT].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            hashSet.add((String) it.next());
        }
        return hashSet;
    }
}
