/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.ddl.commands;

import io.confluent.ksql.ddl.commands.DDLCommand;
import io.confluent.ksql.ddl.commands.DDLCommandResult;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.RegisterTopic;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.StringUtil;
import java.util.Map;

public class RegisterTopicCommand
implements DDLCommand {
    private final String topicName;
    private final String kafkaTopicName;
    private final KsqlTopicSerDe topicSerDe;
    private final boolean notExists;

    public RegisterTopicCommand(RegisterTopic registerTopic) {
        this(registerTopic.getName().getSuffix(), registerTopic.isNotExists(), registerTopic.getProperties());
    }

    RegisterTopicCommand(String topicName, boolean notExist, Map<String, Expression> properties) {
        this.topicName = topicName;
        this.enforceTopicProperties(properties);
        this.kafkaTopicName = StringUtil.cleanQuotes(properties.get("KAFKA_TOPIC").toString());
        String serde = StringUtil.cleanQuotes(properties.get("VALUE_FORMAT").toString());
        this.topicSerDe = this.extractTopicSerDe(serde);
        this.notExists = notExist;
    }

    private KsqlTopicSerDe extractTopicSerDe(String serde) {
        switch (serde.toUpperCase()) {
            case "AVRO": {
                return new KsqlAvroTopicSerDe();
            }
            case "JSON": {
                return new KsqlJsonTopicSerDe();
            }
            case "DELIMITED": {
                return new KsqlDelimitedTopicSerDe();
            }
        }
        throw new KsqlException("The specified topic serde is not supported.");
    }

    private void enforceTopicProperties(Map<String, Expression> properties) {
        if (properties.size() == 0) {
            throw new KsqlException("Register topic statement needs WITH clause.");
        }
        if (!properties.containsKey("VALUE_FORMAT")) {
            throw new KsqlException("Topic format(format) should be set in WITH clause.");
        }
        if (!properties.containsKey("KAFKA_TOPIC")) {
            throw new KsqlException("Corresponding kafka topic should be set in WITH clause.");
        }
    }

    @Override
    public DDLCommandResult run(MetaStore metaStore) {
        if (metaStore.getTopic(this.topicName) != null) {
            if (this.notExists) {
                return new DDLCommandResult(true, "Topic is not registered because it already registered.");
            }
            throw new KsqlException("Topic already registered.");
        }
        KsqlTopic ksqlTopic = new KsqlTopic(this.topicName, this.kafkaTopicName, this.topicSerDe);
        metaStore.putTopic(ksqlTopic);
        return new DDLCommandResult(true, "Topic registered");
    }
}

