package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager.class */
public class InternalTopicManager {
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
    private final long windowChangeLogAdditionalRetention;
    private final short replicationFactor;
    private final AdminClient adminClient;
    private final int retries;
    private final Map<String, String> defaultTopicConfigs = new HashMap();
    private final Logger log = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())).logger(getClass());

    public InternalTopicManager(AdminClient adminClient, StreamsConfig streamsConfig) {
        this.adminClient = adminClient;
        this.replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG).longValue();
        this.retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(StreamsConfig.RETRIES_CONFIG).intValue();
        this.log.debug("Configs:" + Utils.NL, new Object[]{"\t{} = {}" + Utils.NL, "\t{} = {}" + Utils.NL, "\t{} = {}", StreamsConfig.RETRIES_CONFIG, Integer.valueOf(this.retries), StreamsConfig.REPLICATION_FACTOR_CONFIG, Short.valueOf(this.replicationFactor), StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.valueOf(this.windowChangeLogAdditionalRetention)});
        for (Map.Entry entry : streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
            if (entry.getValue() != null) {
                this.defaultTopicConfigs.put((String) entry.getKey(), entry.getValue().toString());
            }
        }
    }

    public void makeReady(Map<String, InternalTopicConfig> map) {
        int i;
        Set<InternalTopicConfig> validateTopicPartitions = validateTopicPartitions(map.values(), getNumPartitions(map.keySet()));
        if (validateTopicPartitions.size() > 0) {
            HashSet hashSet = new HashSet();
            for (InternalTopicConfig internalTopicConfig : validateTopicPartitions) {
                Map<String, String> properties = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), Integer.valueOf(internalTopicConfig.numberOfPartitions()), properties});
                hashSet.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), this.replicationFactor).configs(properties));
            }
            int i2 = this.retries;
            do {
                boolean z = false;
                CreateTopicsResult createTopics = this.adminClient.createTopics(hashSet);
                HashSet hashSet2 = new HashSet();
                for (Map.Entry entry : createTopics.values().entrySet()) {
                    try {
                        ((KafkaFuture) entry.getValue()).get();
                        hashSet2.add((String) entry.getKey());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
                    } catch (ExecutionException e2) {
                        Throwable cause = e2.getCause();
                        String str = (String) entry.getKey();
                        if (cause instanceof TimeoutException) {
                            z = true;
                            this.log.debug("Could not get number of partitions for topic {} due to timeout. Will try again (remaining retries {}).", str, Integer.valueOf(i2 - 1));
                        } else {
                            if (!(cause instanceof TopicExistsException)) {
                                throw new StreamsException(String.format("Could not create topic %s.", str), e2);
                            }
                            hashSet2.add((String) entry.getKey());
                            this.log.info(String.format("Topic %s exist already: %s", str, e2.getMessage()));
                        }
                    }
                }
                if (!z) {
                    return;
                }
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    if (hashSet2.contains(((NewTopic) it.next()).name())) {
                        it.remove();
                    }
                }
                i = i2;
                i2--;
            } while (i > 0);
            this.log.error("Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.");
            throw new StreamsException("Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.");
        }
    }

    protected Map<String, Integer> getNumPartitions(Set<String> set) {
        int i;
        int i2 = this.retries;
        do {
            boolean z = false;
            Map values = this.adminClient.describeTopics(set).values();
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : values.entrySet()) {
                try {
                    hashMap.put((String) entry.getKey(), Integer.valueOf(((TopicDescription) ((KafkaFuture) entry.getValue()).get()).partitions().size()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
                } catch (ExecutionException e2) {
                    Throwable cause = e2.getCause();
                    if (cause instanceof TimeoutException) {
                        z = true;
                        this.log.debug("Could not get number of partitions for topic {} due to timeout. Will try again (remaining retries {}).", entry.getKey(), Integer.valueOf(i2 - 1));
                    } else {
                        this.log.debug("Could not get number of partitions for topic {}.", entry.getKey(), cause.getMessage());
                    }
                }
            }
            if (!z) {
                return hashMap;
            }
            set.removeAll(hashMap.keySet());
            i = i2;
            i2--;
        } while (i > 0);
        return Collections.emptyMap();
    }

    private Set<InternalTopicConfig> validateTopicPartitions(Collection<InternalTopicConfig> collection, Map<String, Integer> map) {
        HashSet hashSet = new HashSet();
        for (InternalTopicConfig internalTopicConfig : collection) {
            Integer valueOf = Integer.valueOf(internalTopicConfig.numberOfPartitions());
            if (!map.containsKey(internalTopicConfig.name())) {
                hashSet.add(internalTopicConfig);
            } else if (!map.get(internalTopicConfig.name()).equals(valueOf)) {
                String format = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", internalTopicConfig.name(), valueOf, map.get(internalTopicConfig.name()));
                this.log.error(format);
                throw new StreamsException(format);
            }
        }
        return hashSet;
    }
}
