/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.util;

import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KafkaTopicException;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTopicClientImpl
implements KafkaTopicClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class);
    private static final int NUM_RETRIES = 5;
    private static final int RETRY_BACKOFF_MS = 500;
    private final AdminClient adminClient;
    private boolean isDeleteTopicEnabled = false;
    private Supplier<String> ksqlDefaultStream;

    public KafkaTopicClientImpl(AdminClient adminClient, Supplier<String> ksqlDefaultStream) {
        this.adminClient = adminClient;
        this.ksqlDefaultStream = ksqlDefaultStream;
        this.init();
    }

    @Override
    public void createTopic(String topic, int numPartitions, short replicatonFactor, boolean isCompacted) {
        this.createTopic(topic, numPartitions, replicatonFactor, Collections.emptyMap(), isCompacted);
    }

    @Override
    public void createTopic(String topic, int numPartitions, short replicationFactor, Map<String, String> configs, boolean isCompacted) {
        if (this.isTopicExists(topic)) {
            this.validateTopicProperties(topic, numPartitions, replicationFactor);
            return;
        }
        NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
        HashMap<String, String> newTopicConfigs = new HashMap<String, String>();
        newTopicConfigs.putAll(configs);
        if (isCompacted) {
            newTopicConfigs.put("cleanup.policy", "compact");
        }
        newTopic.configs(newTopicConfigs);
        try {
            log.info("Creating topic '{}'", (Object)topic);
            RetryHelper retryHelper = new RetryHelper();
            retryHelper.executeWithRetries(() -> this.adminClient.createTopics(Collections.singleton(newTopic)).all());
        }
        catch (InterruptedException e) {
            throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + topic, e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                this.validateTopicProperties(topic, numPartitions, replicationFactor);
                return;
            }
            throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + topic, e);
        }
    }

    @Override
    public boolean isTopicExists(String topic) {
        log.trace("Checking for existence of topic '{}'", (Object)topic);
        String[] streamAndTopic = topic.split(":");
        if (streamAndTopic.length > 1) {
            return this.listTopicNames(streamAndTopic[0]).contains(streamAndTopic[1]);
        }
        return this.listTopicNames().contains(topic);
    }

    @Override
    public Set<String> listTopicNames(String stream) {
        try {
            return (Set)this.adminClient.listTopics(stream).names().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
        }
    }

    @Override
    public Set<String> listTopicNames() {
        try {
            RetryHelper retryHelper = new RetryHelper();
            if (!this.ksqlDefaultStream.get().isEmpty()) {
                return (Set)retryHelper.executeWithRetries(() -> this.adminClient.listTopics(this.ksqlDefaultStream.get()).names());
            }
            throw new KafkaException("Cannot get listTopicNames() without default stream name");
        }
        catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
        }
    }

    @Override
    public Map<String, TopicDescription> describeTopics(Collection<String> topicNames) {
        try {
            RetryHelper retryHelper = new RetryHelper();
            Collection topicNamesWithStreamName = topicNames.stream().map(topic -> CommonUtils.decorateTopicWithDefaultStreamIfNeeded((String)topic, (String)this.ksqlDefaultStream.get())).collect(Collectors.toSet());
            return (Map)retryHelper.executeWithRetries(() -> this.adminClient.describeTopics(topicNamesWithStreamName).all());
        }
        catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to Describe Kafka Topics", e);
        }
    }

    @Override
    public KafkaTopicClient.TopicCleanupPolicy getTopicCleanupPolicy(String topicName) {
        RetryHelper retryHelper = new RetryHelper();
        Map configMap = null;
        try {
            configMap = (Map)retryHelper.executeWithRetries(() -> this.adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName))).all());
        }
        catch (Exception e) {
            throw new KsqlException("Could not get the topic configs for : " + topicName, (Throwable)e);
        }
        if (configMap == null) {
            throw new KsqlException("Could not get the topic configs for : " + topicName);
        }
        Object[] configValues = ((Config)configMap.values().stream().findFirst().get()).entries().stream().filter(configEntry -> configEntry.name().equalsIgnoreCase("cleanup.policy")).toArray();
        if (configValues == null || configValues.length == 0) {
            throw new KsqlException("Could not get the topic configs for : " + topicName);
        }
        switch (((ConfigEntry)configValues[0]).value().toString().toLowerCase()) {
            case "compact": {
                return KafkaTopicClient.TopicCleanupPolicy.COMPACT;
            }
            case "delete": {
                return KafkaTopicClient.TopicCleanupPolicy.DELETE;
            }
            case "compact+delete": {
                return KafkaTopicClient.TopicCleanupPolicy.COMPACT_DELETE;
            }
        }
        throw new KsqlException("Could not get the topic configs for : " + topicName);
    }

    @Override
    public void deleteTopics(List<String> topicsToDelete) {
        if (!this.isDeleteTopicEnabled) {
            log.info("Cannot delete topics since 'delete.topic.enable' is false. ");
            return;
        }
        DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(topicsToDelete);
        Map results = deleteTopicsResult.values();
        ArrayList failList = new ArrayList();
        for (Map.Entry entry : results.entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                failList.add(entry.getKey());
            }
        }
        if (!failList.isEmpty()) {
            throw new KsqlException("Failed to clean up topics: " + failList.stream().collect(Collectors.joining(",")));
        }
    }

    @Override
    public void deleteInternalTopics(String applicationId) {
        if (!this.isDeleteTopicEnabled) {
            log.warn("Cannot delete topics since 'delete.topic.enable' is false. ");
            return;
        }
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get((Configuration)conf);
            String appDir = "/apps/kafka-streams/" + applicationId;
            Path p = new Path(appDir);
            if (fs.exists(p)) {
                fs.delete(p, true);
            }
        }
        catch (Exception e) {
            log.error("Exception while trying to clean up internal topics for application id: {}.", (Object)applicationId, (Object)e);
        }
    }

    private void init() {
        try {
            DescribeClusterResult describeClusterResult = this.adminClient.describeCluster();
            ArrayList nodes = new ArrayList((Collection)describeClusterResult.nodes().get());
            if (nodes.isEmpty()) {
                log.warn("No available broker found to fetch config info.");
                throw new KsqlException("Could not fetch broker information. KSQL cannot initialize AdminCLient.");
            }
            ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(((Node)nodes.get(0)).id()));
            this.isDeleteTopicEnabled = true;
        }
        catch (InterruptedException | ExecutionException ex) {
            log.error("Failed to initialize TopicClient: {}", (Object)ex.getMessage());
            throw new KsqlException("Could not fetch broker information. KSQL cannot initialize AdminClient.", (Throwable)ex);
        }
    }

    private boolean isInternalTopic(String topicName, String applicationId) {
        return topicName.startsWith(applicationId + "-") && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
    }

    @Override
    public void close() {
        this.adminClient.close();
    }

    private void validateTopicProperties(String topic, int numPartitions, short replicationFactor) {
        Map<String, TopicDescription> topicDescriptions = this.describeTopics(Collections.singletonList(topic));
        TopicDescription topicDescription = topicDescriptions.get(topic);
        if (topicDescription.partitions().size() != numPartitions || ((TopicPartitionInfo)topicDescription.partitions().get(0)).replicas().size() < replicationFactor) {
            throw new KafkaTopicException(String.format("Topic '%s' does not conform to the requirements Partitions:%d v %d. Replication: %d v %d", topic, topicDescription.partitions().size(), numPartitions, ((TopicPartitionInfo)topicDescription.partitions().get(0)).replicas().size(), replicationFactor));
        }
        log.debug("Did not create topic {} with {} partitions and replication-factor {} since it already exists", new Object[]{topic, numPartitions, replicationFactor});
    }

    private static class RetryHelper<T> {
        private RetryHelper() {
        }

        T executeWithRetries(Supplier<KafkaFuture<T>> supplier) throws InterruptedException, ExecutionException {
            int retries = 0;
            ExecutionException lastException = null;
            while (retries < 5) {
                try {
                    if (retries != 0) {
                        Thread.sleep(500L);
                    }
                    return (T)supplier.get().get();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof RetriableException) {
                        log.info("Retrying admin request due to retriable exception. Retry no: " + ++retries, (Throwable)e);
                        lastException = e;
                        continue;
                    }
                    throw e;
                }
            }
            throw new ExecutionException(lastException);
        }
    }
}

