package io.confluent.ksql.util;

import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KafkaTopicException;
import io.confluent.ksql.util.KafkaTopicClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/util/KafkaTopicClientImpl.class */
public class KafkaTopicClientImpl implements KafkaTopicClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class);
    private static final int NUM_RETRIES = 5;
    private static final int RETRY_BACKOFF_MS = 500;
    private final AdminClient adminClient;
    private boolean isDeleteTopicEnabled = false;
    private Supplier<String> ksqlDefaultStream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/util/KafkaTopicClientImpl$RetryHelper.class */
    public static class RetryHelper<T> {
        private RetryHelper() {
        }

        T executeWithRetries(Supplier<KafkaFuture<T>> supplier) throws InterruptedException, ExecutionException {
            int i = 0;
            ExecutionException executionException = null;
            while (true) {
                ExecutionException executionException2 = executionException;
                if (i >= KafkaTopicClientImpl.NUM_RETRIES) {
                    throw new ExecutionException(executionException2);
                }
                if (i != 0) {
                    try {
                        Thread.sleep(500L);
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof RetriableException)) {
                            throw e;
                        }
                        i++;
                        KafkaTopicClientImpl.log.info("Retrying admin request due to retriable exception. Retry no: " + i, e);
                        executionException = e;
                    }
                }
                return (T) supplier.get().get();
            }
        }
    }

    public KafkaTopicClientImpl(AdminClient adminClient, Supplier<String> supplier) {
        this.adminClient = adminClient;
        this.ksqlDefaultStream = supplier;
        init();
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public void createTopic(String str, int i, short s, boolean z) {
        createTopic(str, i, s, Collections.emptyMap(), z);
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public void createTopic(String str, int i, short s, Map<String, String> map, boolean z) {
        if (isTopicExists(str)) {
            validateTopicProperties(str, i, s);
            return;
        }
        NewTopic newTopic = new NewTopic(str, i, s);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        if (z) {
            hashMap.put("cleanup.policy", "compact");
        }
        newTopic.configs(hashMap);
        try {
            log.info("Creating topic '{}'", str);
            new RetryHelper().executeWithRetries(() -> {
                return this.adminClient.createTopics(Collections.singleton(newTopic)).all();
            });
        } catch (InterruptedException e) {
            throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + str, e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof TopicExistsException)) {
                throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + str, e2);
            }
            validateTopicProperties(str, i, s);
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public boolean isTopicExists(String str) {
        log.trace("Checking for existence of topic '{}'", str);
        String[] split = str.split(":");
        return split.length > 1 ? listTopicNames(split[0]).contains(split[1]) : listTopicNames().contains(str);
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public Set<String> listTopicNames(String str) {
        try {
            return (Set) this.adminClient.listTopics(str).names().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public Set<String> listTopicNames() {
        try {
            RetryHelper retryHelper = new RetryHelper();
            if (this.ksqlDefaultStream.get().isEmpty()) {
                throw new KafkaException("Cannot get listTopicNames() without default stream name");
            }
            return (Set) retryHelper.executeWithRetries(() -> {
                return this.adminClient.listTopics(this.ksqlDefaultStream.get()).names();
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public Map<String, TopicDescription> describeTopics(Collection<String> collection) {
        try {
            RetryHelper retryHelper = new RetryHelper();
            Collection collection2 = (Collection) collection.stream().map(str -> {
                return CommonUtils.decorateTopicWithDefaultStreamIfNeeded(str, this.ksqlDefaultStream.get());
            }).collect(Collectors.toSet());
            return (Map) retryHelper.executeWithRetries(() -> {
                return this.adminClient.describeTopics(collection2).all();
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to Describe Kafka Topics", e);
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public KafkaTopicClient.TopicCleanupPolicy getTopicCleanupPolicy(String str) {
        try {
            Map map = (Map) new RetryHelper().executeWithRetries(() -> {
                return this.adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, str))).all();
            });
            if (map == null) {
                throw new KsqlException("Could not get the topic configs for : " + str);
            }
            Object[] array = ((Config) map.values().stream().findFirst().get()).entries().stream().filter(configEntry -> {
                return configEntry.name().equalsIgnoreCase("cleanup.policy");
            }).toArray();
            if (array == null || array.length == 0) {
                throw new KsqlException("Could not get the topic configs for : " + str);
            }
            String lowerCase = ((ConfigEntry) array[0]).value().toString().toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1335458389:
                    if (lowerCase.equals("delete")) {
                        z = true;
                        break;
                    }
                    break;
                case -1179909869:
                    if (lowerCase.equals("compact+delete")) {
                        z = 2;
                        break;
                    }
                    break;
                case 950483747:
                    if (lowerCase.equals("compact")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return KafkaTopicClient.TopicCleanupPolicy.COMPACT;
                case true:
                    return KafkaTopicClient.TopicCleanupPolicy.DELETE;
                case true:
                    return KafkaTopicClient.TopicCleanupPolicy.COMPACT_DELETE;
                default:
                    throw new KsqlException("Could not get the topic configs for : " + str);
            }
        } catch (Exception e) {
            throw new KsqlException("Could not get the topic configs for : " + str, e);
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public void deleteTopics(List<String> list) {
        if (!this.isDeleteTopicEnabled) {
            log.info("Cannot delete topics since 'delete.topic.enable' is false. ");
            return;
        }
        Map values = this.adminClient.deleteTopics(list).values();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : values.entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get(30L, TimeUnit.SECONDS);
            } catch (Exception e) {
                arrayList.add(entry.getKey());
            }
        }
        if (!arrayList.isEmpty()) {
            throw new KsqlException("Failed to clean up topics: " + ((String) arrayList.stream().collect(Collectors.joining(","))));
        }
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient
    public void deleteInternalTopics(String str) {
        if (!this.isDeleteTopicEnabled) {
            log.warn("Cannot delete topics since 'delete.topic.enable' is false. ");
            return;
        }
        try {
            FileSystem fileSystem = FileSystem.get(new Configuration());
            Path path = new Path("/apps/kafka-streams/" + str);
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
        } catch (Exception e) {
            log.error("Exception while trying to clean up internal topics for application id: {}.", str, e);
        }
    }

    private void init() {
        try {
            ArrayList arrayList = new ArrayList((Collection) this.adminClient.describeCluster().nodes().get());
            if (arrayList.isEmpty()) {
                log.warn("No available broker found to fetch config info.");
                throw new KsqlException("Could not fetch broker information. KSQL cannot initialize AdminCLient.");
            }
            new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(((Node) arrayList.get(0)).id()));
            this.isDeleteTopicEnabled = true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("Failed to initialize TopicClient: {}", e.getMessage());
            throw new KsqlException("Could not fetch broker information. KSQL cannot initialize AdminClient.", e);
        }
    }

    private boolean isInternalTopic(String str, String str2) {
        return str.startsWith(new StringBuilder().append(str2).append("-").toString()) && (str.endsWith("-changelog") || str.endsWith("-repartition"));
    }

    @Override // io.confluent.ksql.util.KafkaTopicClient, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.adminClient.close();
    }

    private void validateTopicProperties(String str, int i, short s) {
        TopicDescription topicDescription = describeTopics(Collections.singletonList(str)).get(str);
        if (topicDescription.partitions().size() != i || ((TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size() < s) {
            throw new KafkaTopicException(String.format("Topic '%s' does not conform to the requirements Partitions:%d v %d. Replication: %d v %d", str, Integer.valueOf(topicDescription.partitions().size()), Integer.valueOf(i), Integer.valueOf(((TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size()), Short.valueOf(s)));
        }
        log.debug("Did not create topic {} with {} partitions and replication-factor {} since it already exists", new Object[]{str, Integer.valueOf(i), Short.valueOf(s)});
    }
}
