/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.StreamsMetadataConsumer;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionReplica;
import io.confluent.kafkarest.entities.Topic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import kafka.cluster.Broker;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsMetadataObserver
extends MetadataObserver {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsMetadataObserver.class);
    private StreamsMetadataConsumer streamsMetadataConsumer;
    private boolean isStreams;
    private boolean defaultStreamSet;
    private String defaultStream;
    private ZkUtils zkUtil;
    private boolean isImpersonationEnabled;

    public boolean isImpersonationEnabled() {
        return this.isImpersonationEnabled;
    }

    public KafkaStreamsMetadataObserver(KafkaRestConfig config, ZkUtils zkUtils, boolean isStreams, boolean isImpersonationEnabled) {
        super(zkUtils);
        String bootstrapServers = config.getString("bootstrap.servers");
        String defaultStream = config.getString("streams.default.stream");
        this.defaultStreamSet = config.isDefaultStreamSet();
        this.defaultStream = defaultStream;
        this.isStreams = isStreams;
        this.isImpersonationEnabled = isImpersonationEnabled;
        this.zkUtil = zkUtils;
        this.streamsMetadataConsumer = new StreamsMetadataConsumer(bootstrapServers, defaultStream);
    }

    @Override
    public Broker getLeader(String topicName, int partitionId) {
        if (this.isStreams) {
            throw Errors.notSupportedByMapRStreams();
        }
        return super.getLeader(topicName, partitionId);
    }

    @Override
    public Collection<String> getTopicNames() {
        if (this.isStreams || this.defaultStreamSet) {
            try {
                return this.streamsMetadataConsumer.listTopics().keySet();
            }
            catch (KafkaException e) {
                log.warn("listTopics() API", (Throwable)e);
                throw Errors.notSupportedByMapRStreams("Please try to set streams.default.stream to return topics for default stream");
            }
        }
        return super.getTopicNames();
    }

    public Collection<String> getTopicNames(String stream) {
        return this.streamsMetadataConsumer.listTopics(stream).keySet();
    }

    public String toFullyQualifiedTopic(String topic) {
        if (this.defaultStreamSet && !topic.contains(":")) {
            String fullName = this.defaultStream;
            if (!fullName.endsWith(":")) {
                fullName = fullName.concat(":");
            }
            return fullName.concat(topic);
        }
        return topic;
    }

    @Override
    public List<Topic> getTopics() {
        if (this.isStreams || this.defaultStreamSet) {
            try {
                ArrayList<Topic> result = new ArrayList<Topic>();
                Map<String, List<PartitionInfo>> topicsMap = this.streamsMetadataConsumer.listTopics();
                for (Map.Entry<String, List<PartitionInfo>> entry : topicsMap.entrySet()) {
                    Topic topic = new Topic(entry.getKey(), null, KafkaStreamsMetadataObserver.convertPartitions(entry.getValue()));
                    result.add(topic);
                }
                return result;
            }
            catch (KafkaException e) {
                log.warn("listTopics() API", (Throwable)e);
                throw Errors.notSupportedByMapRStreams("Please try to set streams.default.stream to return topics for default stream");
            }
        }
        return super.getTopics();
    }

    public List<Topic> getTopics(String stream) {
        ArrayList<Topic> result = new ArrayList<Topic>();
        Map<String, List<PartitionInfo>> topicsMap = this.streamsMetadataConsumer.listTopics(stream);
        for (Map.Entry<String, List<PartitionInfo>> entry : topicsMap.entrySet()) {
            Topic topic = new Topic(entry.getKey(), null, KafkaStreamsMetadataObserver.convertPartitions(entry.getValue()));
            result.add(topic);
        }
        return result;
    }

    @Override
    public boolean topicExists(String topicName) {
        if (this.isStreams || this.defaultStreamSet) {
            if (topicName.startsWith("/") && topicName.contains(":")) {
                String[] splitted = topicName.split(":");
                String stream = splitted[0];
                return this.getTopicNames(stream).contains(topicName);
            }
            return this.getTopicNames().contains(this.toFullyQualifiedTopic(topicName));
        }
        if (topicName.startsWith("/") && topicName.contains(":")) {
            String[] splitted = topicName.split(":");
            String stream = splitted[0];
            return this.getTopicNames(stream).contains(topicName);
        }
        return super.getTopicNames().contains(topicName);
    }

    public boolean requestToStreams(String topic) {
        return this.isStreams || this.defaultStreamSet || topic.startsWith("/") && topic.contains(":");
    }

    public Topic getTopic(String topicName) {
        if (this.isStreams || this.defaultStreamSet) {
            return new Topic(topicName, null, this.getTopicPartitions(topicName));
        }
        if (topicName.startsWith("/") && topicName.contains(":")) {
            return new Topic(topicName, null, this.getTopicPartitions(topicName));
        }
        return null;
    }

    @Override
    public List<Partition> getTopicPartitions(String topic) {
        if (this.isStreams || this.defaultStreamSet) {
            return KafkaStreamsMetadataObserver.convertPartitions(this.streamsMetadataConsumer.partitionsFor(topic));
        }
        if (topic.startsWith("/") && topic.contains(":")) {
            return KafkaStreamsMetadataObserver.convertPartitions(this.streamsMetadataConsumer.partitionsFor(topic));
        }
        return super.getTopicPartitions(topic);
    }

    @Override
    public int getLeaderId(String topicName, int partitionId) {
        if (this.isStreams) {
            throw Errors.notSupportedByMapRStreams();
        }
        return super.getLeaderId(topicName, partitionId);
    }

    @Override
    public void shutdown() {
        log.debug("Shutting down MetadataObserver");
        try {
            if (this.streamsMetadataConsumer != null) {
                this.streamsMetadataConsumer.close();
            }
            super.shutdown();
        }
        catch (Exception e) {
            log.error("Close metadata consumer", (Throwable)e);
        }
    }

    public Partition getTopicPartition(String topic, int partition) {
        List<Partition> partitions = this.getTopicPartitions(topic);
        for (Partition p : partitions) {
            if (p.getPartition() != partition) continue;
            return p;
        }
        return null;
    }

    public boolean partitionExists(String topicName, int partition) {
        if (this.isStreams || this.defaultStreamSet) {
            return partition >= 0 && partition < this.streamsMetadataConsumer.partitionsFor(topicName).size();
        }
        if (topicName.startsWith("/") && topicName.contains(":")) {
            return partition >= 0 && partition < this.streamsMetadataConsumer.partitionsFor(topicName).size();
        }
        return false;
    }

    private static List<Partition> convertPartitions(List<PartitionInfo> partitionInfos) {
        ArrayList<Partition> partitions = new ArrayList<Partition>();
        for (PartitionInfo partitionInfo : partitionInfos) {
            ArrayList<PartitionReplica> replicas = new ArrayList<PartitionReplica>();
            replicas.add(new PartitionReplica(partitionInfo.leader().id(), true, true));
            List<Node> inSyncReplicas = Arrays.asList(partitionInfo.inSyncReplicas());
            for (Node node : partitionInfo.replicas()) {
                replicas.add(new PartitionReplica(node.id(), false, inSyncReplicas.contains(node)));
            }
            Partition partition = new Partition(partitionInfo.partition(), partitionInfo.leader().id(), replicas);
            partitions.add(partition);
        }
        return partitions;
    }

    public ZkUtils getZkUtils() {
        return this.zkUtil;
    }

    public boolean isStreams() {
        return this.isStreams;
    }
}

