/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.util;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicAdmin
implements AutoCloseable {
    public static final int NO_PARTITIONS = -1;
    public static final short NO_REPLICATION_FACTOR = -1;
    private static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
    private static final String CLEANUP_POLICY_COMPACT = "compact";
    private static final String MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas";
    private static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
    private static final Logger log = LoggerFactory.getLogger(TopicAdmin.class);
    private final Map<String, Object> adminConfig;
    private final Admin admin;
    private final boolean logCreation;

    public static NewTopicBuilder defineTopic(String topicName) {
        return new NewTopicBuilder(topicName);
    }

    public TopicAdmin(Map<String, Object> adminConfig) {
        this(adminConfig, Admin.create(adminConfig));
    }

    TopicAdmin(Map<String, Object> adminConfig, Admin adminClient) {
        this(adminConfig, adminClient, true);
    }

    TopicAdmin(Map<String, Object> adminConfig, Admin adminClient, boolean logCreation) {
        this.admin = adminClient;
        this.adminConfig = adminConfig != null ? adminConfig : Collections.emptyMap();
        this.logCreation = logCreation;
    }

    public boolean createTopic(NewTopic topic) {
        if (topic == null) {
            return false;
        }
        Set<String> newTopicNames = this.createTopics(topic);
        return newTopicNames.contains(topic.name());
    }

    public Set<String> createTopics(NewTopic ... topics) {
        HashMap<String, NewTopic> topicsByName = new HashMap<String, NewTopic>();
        if (topics != null) {
            for (NewTopic topic : topics) {
                if (topic == null) continue;
                topicsByName.put(topic.name(), topic);
            }
        }
        if (topicsByName.isEmpty()) {
            return Collections.emptySet();
        }
        String bootstrapServers = this.bootstrapServers();
        String topicNameList = Utils.join(topicsByName.keySet(), (String)"', '");
        CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
        Map newResults = this.admin.createTopics(topicsByName.values(), args).values();
        HashSet<String> newlyCreatedTopicNames = new HashSet<String>();
        for (Map.Entry entry : newResults.entrySet()) {
            String topic = (String)entry.getKey();
            try {
                ((KafkaFuture)entry.getValue()).get();
                if (this.logCreation) {
                    log.info("Created topic {} on brokers at {}", topicsByName.get(topic), (Object)bootstrapServers);
                }
                newlyCreatedTopicNames.add(topic);
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof TopicExistsException) {
                    log.debug("Found existing topic '{}' on the brokers at {}", (Object)topic, (Object)bootstrapServers);
                    continue;
                }
                if (cause instanceof UnsupportedVersionException) {
                    log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof ClusterAuthorizationException) {
                    log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof TopicAuthorizationException) {
                    log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof InvalidConfigurationException) {
                    throw new ConnectException("Unable to create topic(s) '" + topicNameList + "': " + cause.getMessage(), cause);
                }
                if (cause instanceof TimeoutException) {
                    throw new ConnectException("Timed out while checking for or creating topic(s) '" + topicNameList + "'. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.", cause);
                }
                throw new ConnectException("Error while attempting to create/find topic(s) '" + topicNameList + "'", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", (Throwable)e);
            }
        }
        return newlyCreatedTopicNames;
    }

    public Map<String, TopicDescription> describeTopics(String ... topics) {
        if (topics == null) {
            return Collections.emptyMap();
        }
        String bootstrapServers = this.bootstrapServers();
        String topicNameList = String.join((CharSequence)", ", topics);
        Map newResults = this.admin.describeTopics(Arrays.asList(topics), new DescribeTopicsOptions()).values();
        HashMap<String, TopicDescription> existingTopics = new HashMap<String, TopicDescription>();
        newResults.forEach((topic, desc) -> {
            try {
                existingTopics.put((String)topic, (TopicDescription)desc.get());
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    log.debug("Topic '{}' does not exist on the brokers at {}", topic, (Object)bootstrapServers);
                    return;
                }
                if (cause instanceof ClusterAuthorizationException || cause instanceof TopicAuthorizationException) {
                    String msg = String.format("Not authorized to describe topic(s) '%s' on the brokers %s", topicNameList, bootstrapServers);
                    throw new ConnectException(msg, cause);
                }
                if (cause instanceof UnsupportedVersionException) {
                    String msg = String.format("Unable to describe topic(s) '%s' since the brokers at %s do not support the DescribeTopics API.", topicNameList, bootstrapServers);
                    throw new ConnectException(msg, cause);
                }
                if (cause instanceof TimeoutException) {
                    throw new RetriableException("Timed out while describing topics '" + topicNameList + "'", cause);
                }
                throw new ConnectException("Error while attempting to describe topics '" + topicNameList + "'", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new RetriableException("Interrupted while attempting to describe topics '" + topicNameList + "'", (Throwable)e);
            }
        });
        return existingTopics;
    }

    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig, String topicPurpose) {
        Set<String> cleanupPolicies = this.topicCleanupPolicy(topic);
        if (cleanupPolicies.isEmpty()) {
            log.info("Unable to use admin client to verify the cleanup policy of '{}' topic is '{}', either because the broker is an older version or because the Kafka principal used for Connect internal topics does not have the required permission to describe topic configurations.", (Object)topic, (Object)CLEANUP_POLICY_COMPACT);
            return false;
        }
        Set<String> expectedPolicies = Collections.singleton(CLEANUP_POLICY_COMPACT);
        if (!cleanupPolicies.equals(expectedPolicies)) {
            String expectedPolicyStr = String.join((CharSequence)",", expectedPolicies);
            String cleanupPolicyStr = String.join((CharSequence)",", cleanupPolicies);
            String msg = String.format("Topic '%s' supplied via the '%s' property is required to have '%s=%s' to guarantee consistency and durability of %s, but found the topic currently has '%s=%s'. Continuing would likely result in eventually losing %s and problems restarting this Connect cluster in the future. Change the '%s' property in the Connect worker configurations to use a topic with '%s=%s'.", topic, workerTopicConfig, CLEANUP_POLICY_CONFIG, expectedPolicyStr, topicPurpose, CLEANUP_POLICY_CONFIG, cleanupPolicyStr, topicPurpose, workerTopicConfig, CLEANUP_POLICY_CONFIG, expectedPolicyStr);
            throw new ConfigException(msg);
        }
        return true;
    }

    public Set<String> topicCleanupPolicy(String topic) {
        Config topicConfig = this.describeTopicConfig(topic);
        if (topicConfig == null) {
            log.debug("Unable to find topic '{}' when getting cleanup policy", (Object)topic);
            return Collections.emptySet();
        }
        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
        if (entry != null && entry.value() != null) {
            String policyStr = entry.value();
            log.debug("Found cleanup.policy={} for topic '{}'", (Object)policyStr, (Object)topic);
            return Arrays.stream(policyStr.split(",")).map(String::trim).filter(s -> !s.isEmpty()).map(String::toLowerCase).collect(Collectors.toSet());
        }
        log.debug("Found no cleanup.policy for topic '{}'", (Object)topic);
        return Collections.emptySet();
    }

    public Config describeTopicConfig(String topic) {
        return this.describeTopicConfigs(topic).get(topic);
    }

    public Map<String, Config> describeTopicConfigs(String ... topicNames) {
        if (topicNames == null) {
            return Collections.emptyMap();
        }
        Collection topics = Arrays.stream(topicNames).filter(Objects::nonNull).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
        if (topics.isEmpty()) {
            return Collections.emptyMap();
        }
        String bootstrapServers = this.bootstrapServers();
        String topicNameList = topics.stream().collect(Collectors.joining(", "));
        Collection resources = topics.stream().map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t)).collect(Collectors.toList());
        Map newResults = this.admin.describeConfigs(resources, new DescribeConfigsOptions()).values();
        HashMap<String, Config> result = new HashMap<String, Config>();
        newResults.forEach((resource, configs) -> {
            String topic = resource.name();
            try {
                result.put(topic, (Config)configs.get());
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    log.debug("Topic '{}' does not exist on the brokers at {}", (Object)topic, (Object)bootstrapServers);
                    result.put(topic, null);
                }
                if (cause instanceof ClusterAuthorizationException || cause instanceof TopicAuthorizationException) {
                    log.debug("Not authorized to describe topic config for topic '{}' on brokers at {}", (Object)topic, (Object)bootstrapServers);
                }
                if (cause instanceof UnsupportedVersionException) {
                    log.debug("API to describe topic config for topic '{}' is unsupported on brokers at {}", (Object)topic, (Object)bootstrapServers);
                }
                if (cause instanceof TimeoutException) {
                    String msg = String.format("Timed out while waiting to describe topic config for topic '%s' on brokers at %s", topic, bootstrapServers);
                    throw new RetriableException(msg, (Throwable)e);
                }
                String msg = String.format("Error while attempting to describe topic config for topic '%s' on brokers at %s", topic, bootstrapServers);
                throw new ConnectException(msg, (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                String msg = String.format("Interrupted while attempting to describe topic configs '%s'", topicNameList);
                throw new RetriableException(msg, (Throwable)e);
            }
        });
        return result;
    }

    @Override
    public void close() {
        this.admin.close();
    }

    public void close(Duration timeout) {
        this.admin.close(timeout);
    }

    private String bootstrapServers() {
        Object servers = this.adminConfig.get("bootstrap.servers");
        return servers != null ? servers.toString() : "<unknown>";
    }

    public static class NewTopicBuilder {
        private final String name;
        private int numPartitions = -1;
        private short replicationFactor = (short)-1;
        private final Map<String, String> configs = new HashMap<String, String>();

        NewTopicBuilder(String name) {
            this.name = name;
        }

        public NewTopicBuilder partitions(int numPartitions) {
            this.numPartitions = numPartitions;
            return this;
        }

        public NewTopicBuilder defaultPartitions() {
            this.numPartitions = -1;
            return this;
        }

        public NewTopicBuilder replicationFactor(short replicationFactor) {
            this.replicationFactor = replicationFactor;
            return this;
        }

        public NewTopicBuilder defaultReplicationFactor() {
            this.replicationFactor = (short)-1;
            return this;
        }

        public NewTopicBuilder compacted() {
            this.configs.put(TopicAdmin.CLEANUP_POLICY_CONFIG, TopicAdmin.CLEANUP_POLICY_COMPACT);
            return this;
        }

        public NewTopicBuilder minInSyncReplicas(short minInSyncReplicas) {
            this.configs.put(TopicAdmin.MIN_INSYNC_REPLICAS_CONFIG, Short.toString(minInSyncReplicas));
            return this;
        }

        public NewTopicBuilder uncleanLeaderElection(boolean allow) {
            this.configs.put(TopicAdmin.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(allow));
            return this;
        }

        public NewTopicBuilder config(Map<String, Object> configs) {
            if (configs != null) {
                Iterator<Map.Entry<String, Object>> iterator = configs.entrySet().iterator();
                while (iterator.hasNext()) {
                    Object value;
                    Map.Entry<String, Object> entry;
                    this.configs.put(entry.getKey(), (value = (entry = iterator.next()).getValue()) != null ? value.toString() : null);
                }
            } else {
                this.configs.clear();
            }
            return this;
        }

        public NewTopic build() {
            return new NewTopic(this.name, Optional.of(this.numPartitions), Optional.of(this.replicationFactor)).configs(this.configs);
        }
    }
}

