/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.testutils;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import org.yaml.snakeyaml.nodes.Node;
import org.yaml.snakeyaml.nodes.SequenceNode;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;

public class YamlFileMetadataService
implements KafkaMetadataService {
    private static final Logger logger = LoggerFactory.getLogger(YamlFileMetadataService.class);
    private final String metadataFilePath;
    private final Duration refreshInterval;
    private Instant lastRefresh;
    private transient Set<KafkaStream> streamMetadata;
    private transient Yaml yaml;

    public YamlFileMetadataService(String metadataFilePath, Duration metadataTtl) {
        this.metadataFilePath = metadataFilePath;
        this.refreshInterval = metadataTtl;
        this.lastRefresh = Instant.MIN;
    }

    public Set<KafkaStream> getAllStreams() {
        this.refreshIfNeeded();
        return this.streamMetadata;
    }

    public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Set<KafkaStream> streams = this.getAllStreams();
        for (KafkaStream stream : streams) {
            if (!streamIds.contains(stream.getStreamId())) continue;
            builder.put((Object)stream.getStreamId(), (Object)stream);
        }
        return builder.build();
    }

    public boolean isClusterActive(String kafkaClusterId) {
        return this.getAllStreams().stream().flatMap(kafkaStream -> kafkaStream.getClusterMetadataMap().keySet().stream()).anyMatch(cluster -> cluster.equals(kafkaClusterId));
    }

    public void close() throws Exception {
    }

    public static void saveToYaml(List<StreamMetadata> streamMetadata, File metadataFile) throws IOException {
        logger.debug("Writing stream infos to file: {}", streamMetadata);
        Yaml yaml = YamlFileMetadataService.initYamlParser();
        FileWriter fileWriter = new FileWriter(metadataFile, false);
        yaml.dump(streamMetadata, (Writer)fileWriter);
        fileWriter.close();
    }

    public static void saveToYamlFromKafkaStreams(List<KafkaStream> kafkaStreams, File metadataFile) throws IOException {
        YamlFileMetadataService.saveToYaml(kafkaStreams.stream().map(YamlFileMetadataService::convertToStreamMetadata).collect(Collectors.toList()), metadataFile);
    }

    private static StreamMetadata convertToStreamMetadata(KafkaStream kafkaStream) {
        return new StreamMetadata(kafkaStream.getStreamId(), kafkaStream.getClusterMetadataMap().entrySet().stream().map(entry -> new StreamMetadata.ClusterMetadata((String)entry.getKey(), ((ClusterMetadata)entry.getValue()).getProperties().getProperty("bootstrap.servers"), new ArrayList<String>(((ClusterMetadata)entry.getValue()).getTopics()))).collect(Collectors.toList()));
    }

    private void refreshIfNeeded() {
        Instant now = Instant.now();
        try {
            if (now.isAfter(this.lastRefresh.plus(this.refreshInterval.toMillis(), ChronoUnit.MILLIS))) {
                this.streamMetadata = this.parseFile();
                this.lastRefresh = now;
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @VisibleForTesting
    Set<KafkaStream> parseFile() throws IOException {
        if (this.yaml == null) {
            this.yaml = YamlFileMetadataService.initYamlParser();
        }
        List streamMetadataList = (List)this.yaml.load(Files.newInputStream(Paths.get(this.metadataFilePath, new String[0]), new OpenOption[0]));
        if (logger.isDebugEnabled()) {
            logger.debug("Input stream of metadata file has size: {}", (Object)Files.newInputStream(Paths.get(this.metadataFilePath, new String[0]), new OpenOption[0]).available());
        }
        HashSet<KafkaStream> kafkaStreams = new HashSet<KafkaStream>();
        for (StreamMetadata streamMetadata : streamMetadataList) {
            HashMap<String, ClusterMetadata> clusterMetadataMap = new HashMap<String, ClusterMetadata>();
            for (StreamMetadata.ClusterMetadata clusterMetadata : streamMetadata.getClusterMetadataList()) {
                String kafkaClusterId = clusterMetadata.getClusterId() != null ? clusterMetadata.getClusterId() : clusterMetadata.getBootstrapServers();
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", clusterMetadata.getBootstrapServers());
                clusterMetadataMap.put(kafkaClusterId, new ClusterMetadata(new HashSet<String>(clusterMetadata.getTopics()), properties));
            }
            kafkaStreams.add(new KafkaStream(streamMetadata.getStreamId(), clusterMetadataMap));
        }
        logger.debug("From {} loaded metadata: {}", (Object)this.metadataFilePath, kafkaStreams);
        return kafkaStreams;
    }

    private static Yaml initYamlParser() {
        DumperOptions dumperOptions = new DumperOptions();
        Representer representer = new Representer(dumperOptions);
        representer.addClassTag(StreamMetadata.class, Tag.MAP);
        TypeDescription typeDescription = new TypeDescription(StreamMetadata.class);
        representer.addTypeDescription(typeDescription);
        representer.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
        LoaderOptions loaderOptions = new LoaderOptions();
        loaderOptions.setTagInspector(tag -> tag.getClassName().equals(StreamMetadata.class.getName()));
        return new Yaml(new ListConstructor<StreamMetadata>(StreamMetadata.class, loaderOptions), representer);
    }

    public static class StreamMetadata {
        private String streamId;
        private List<ClusterMetadata> clusterMetadataList;

        public StreamMetadata() {
        }

        public StreamMetadata(String streamId, List<ClusterMetadata> clusterMetadataList) {
            this.streamId = streamId;
            this.clusterMetadataList = clusterMetadataList;
        }

        public String getStreamId() {
            return this.streamId;
        }

        public void setStreamId(String streamId) {
            this.streamId = streamId;
        }

        public List<ClusterMetadata> getClusterMetadataList() {
            return this.clusterMetadataList;
        }

        public void setClusterMetadataList(List<ClusterMetadata> clusterMetadata) {
            this.clusterMetadataList = clusterMetadata;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("streamId", (Object)this.streamId).add("clusterMetadataList", this.clusterMetadataList).toString();
        }

        public static class ClusterMetadata {
            private String clusterId;
            private String bootstrapServers;
            private List<String> topics;

            public ClusterMetadata() {
            }

            public ClusterMetadata(String clusterId, String bootstrapServers, List<String> topics) {
                this.clusterId = clusterId;
                this.bootstrapServers = bootstrapServers;
                this.topics = topics;
            }

            public String getClusterId() {
                return this.clusterId;
            }

            public void setClusterId(String clusterId) {
                this.clusterId = clusterId;
            }

            public String getBootstrapServers() {
                return this.bootstrapServers;
            }

            public void setBootstrapServers(String bootstrapServers) {
                this.bootstrapServers = bootstrapServers;
            }

            public List<String> getTopics() {
                return this.topics;
            }

            public void setTopics(List<String> topics) {
                this.topics = topics;
            }
        }
    }

    private static class ListConstructor<T>
    extends Constructor {
        private final Class<T> clazz;

        public ListConstructor(Class<T> clazz, LoaderOptions loaderOptions) {
            super(loaderOptions);
            this.clazz = clazz;
        }

        protected Object constructObject(Node node) {
            if (node instanceof SequenceNode && this.isRootNode(node)) {
                ((SequenceNode)node).setListType(this.clazz);
            }
            return super.constructObject(node);
        }

        private boolean isRootNode(Node node) {
            return node.getStartMark().getIndex() == 0;
        }
    }
}

