/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.testutils;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.testutils.KafkaPartitionDataWriter;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceExternalContext
implements DataStreamSourceExternalContext<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceExternalContext.class);
    private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
    private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("kafka-test-topic-.*");
    private static final String GROUP_ID_PREFIX = "kafka-source-external-context-";
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    private final List<URL> connectorJarPaths;
    private final String bootstrapServers;
    private final String topicName;
    private final SplitMappingMode splitMappingMode;
    private final AdminClient adminClient;
    private final List<KafkaPartitionDataWriter> writers = new ArrayList<KafkaPartitionDataWriter>();

    protected KafkaSourceExternalContext(String bootstrapServers, SplitMappingMode splitMappingMode, List<URL> connectorJarPaths) {
        this.connectorJarPaths = connectorJarPaths;
        this.bootstrapServers = bootstrapServers;
        this.topicName = this.randomize(TOPIC_NAME_PREFIX);
        this.splitMappingMode = splitMappingMode;
        this.adminClient = this.createAdminClient();
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        builder.setBootstrapServers(this.bootstrapServers).setTopicPattern(TOPIC_NAME_PATTERN).setGroupId(this.randomize(GROUP_ID_PREFIX)).setProperties(KafkaUtil.getDefaultKafkaClientsProperties()).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
        if (sourceSettings.getBoundedness().equals((Object)Boundedness.BOUNDED)) {
            builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
        KafkaPartitionDataWriter writer;
        try {
            switch (this.splitMappingMode) {
                case TOPIC: {
                    writer = this.createSinglePartitionTopic(this.writers.size());
                    break;
                }
                case PARTITION: {
                    writer = this.scaleOutTopic(this.topicName);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Split mode should be either TOPIC or PARTITION");
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create new splits", e);
        }
        this.writers.add(writer);
        return writer;
    }

    public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
        Random random = new Random(seed);
        int recordNum = random.nextInt(400) + 100;
        ArrayList<String> records = new ArrayList<String>(recordNum);
        for (int i = 0; i < recordNum; ++i) {
            int stringLength = random.nextInt(50) + 1;
            records.add(splitIndex + "-" + RandomStringUtils.randomAlphanumeric((int)stringLength));
        }
        return records;
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    public void close() throws Exception {
        ArrayList topics = new ArrayList();
        this.writers.forEach(writer -> {
            topics.add(writer.getTopicPartition().topic());
            writer.close();
        });
        this.adminClient.deleteTopics(topics).all().get();
    }

    public String toString() {
        return "KafkaSource-" + this.splitMappingMode.toString();
    }

    private String randomize(String prefix) {
        return prefix + ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE);
    }

    private AdminClient createAdminClient() {
        Properties config = KafkaUtil.getDefaultKafkaClientsProperties();
        config.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create((Properties)config);
    }

    private KafkaPartitionDataWriter createSinglePartitionTopic(int topicIndex) throws Exception {
        String newTopicName = this.topicName + "-" + topicIndex;
        LOG.info("Creating topic '{}'", (Object)newTopicName);
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(newTopicName, 1, 1))).all().get();
        return new KafkaPartitionDataWriter(this.getKafkaProducerProperties(topicIndex), new TopicPartition(newTopicName, 0));
    }

    private KafkaPartitionDataWriter scaleOutTopic(String topicName) throws Exception {
        Set topics = (Set)this.adminClient.listTopics().names().get();
        if (topics.contains(topicName)) {
            Map topicDescriptions = (Map)this.adminClient.describeTopics(Collections.singletonList(topicName)).allTopicNames().get();
            int numPartitions = ((TopicDescription)topicDescriptions.get(topicName)).partitions().size();
            LOG.info("Creating partition {} for topic '{}'", (Object)(numPartitions + 1), (Object)topicName);
            this.adminClient.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo((int)(numPartitions + 1)))).all().get();
            return new KafkaPartitionDataWriter(this.getKafkaProducerProperties(numPartitions), new TopicPartition(topicName, numPartitions));
        }
        LOG.info("Creating topic '{}'", (Object)topicName);
        this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 1))).all().get();
        return new KafkaPartitionDataWriter(this.getKafkaProducerProperties(0), new TopicPartition(topicName, 0));
    }

    private Properties getKafkaProducerProperties(int producerId) {
        Properties kafkaProducerProperties = KafkaUtil.getDefaultKafkaClientsProperties();
        kafkaProducerProperties.setProperty("bootstrap.servers", this.bootstrapServers);
        kafkaProducerProperties.setProperty("client.id", String.join((CharSequence)"-", "flink-kafka-split-writer", Integer.toString(producerId), Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
        kafkaProducerProperties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        kafkaProducerProperties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        return kafkaProducerProperties;
    }

    public static enum SplitMappingMode {
        TOPIC,
        PARTITION;

    }
}

