/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.confluent.schemaregistry;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.confluent.schemaregistry.MultipleURLValidator;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.ssl.SSLContextService;

@Tags(value={"schema", "registry", "confluent", "avro", "kafka"})
@CapabilityDescription(value="Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.")
@DynamicProperty(name="request.header.*", value="String literal, may not be empty", description="Properties that begin with 'request.header.' are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry")
public class ConfluentSchemaRegistry
extends AbstractControllerService
implements SchemaRegistry {
    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
    private static final String REQUEST_HEADER_PREFIX = "request.header.";
    static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder().name("url").displayName("Schema Registry URLs").description("A comma-separated list of URLs of the Schema Registry to interact with").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("http://localhost:8081").required(true).addValidator((Validator)new MultipleURLValidator()).build();
    static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder().name("ssl-context").displayName("SSL Context Service").description("Specifies the SSL Context Service to use for interacting with the Confluent Schema Registry").identifiesControllerService(SSLContextService.class).required(false).build();
    static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder().name("cache-size").displayName("Cache Size").description("Specifies how many Schemas should be cached from the Schema Registry").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).defaultValue("1000").required(true).build();
    static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder().name("cache-expiration").displayName("Cache Expiration").description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a cached version of a schema will no longer be used, and the service will have to communicate with the Schema Registry again in order to obtain the schema.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("1 hour").required(true).build();
    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("timeout").displayName("Communications Timeout").description("Specifies how long to wait to receive data from the Schema Registry before considering the communications a failure").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("30 secs").required(true).build();
    static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder().name("authentication-type").displayName("Authentication Type").description("HTTP Client Authentication Type for Confluent Schema Registry").required(false).allowableValues((Enum[])AuthenticationType.values()).defaultValue(AuthenticationType.NONE.toString()).build();
    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder().name("username").displayName("Username").description("Username for authentication to Confluent Schema Registry").addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))).required(false).dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString(), new String[0]).build();
    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("password").displayName("Password").description("Password for authentication to Confluent Schema Registry").addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))).required(false).dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString(), new String[0]).sensitive(true).build();
    private volatile SchemaRegistryClient client;
    private static final Validator REQUEST_HEADER_VALIDATOR = new Validator(){

        public ValidationResult validate(String subject, String value, ValidationContext context) {
            return new ValidationResult.Builder().subject(subject).input(value).valid(subject.startsWith(ConfluentSchemaRegistry.REQUEST_HEADER_PREFIX) && subject.length() > ConfluentSchemaRegistry.REQUEST_HEADER_PREFIX.length()).explanation("Dynamic property names must be of format 'request.header.*'").build();
        }
    };

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(SCHEMA_REGISTRY_URLS);
        properties.add(SSL_CONTEXT);
        properties.add(TIMEOUT);
        properties.add(CACHE_SIZE);
        properties.add(CACHE_EXPIRATION);
        properties.add(AUTHENTICATION_TYPE);
        properties.add(USERNAME);
        properties.add(PASSWORD);
        return properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptionName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).addValidator(REQUEST_HEADER_VALIDATOR).build();
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        List<String> baseUrls = this.getBaseURLs((PropertyContext)context);
        int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
        SSLContext sslContext = sslContextService == null ? null : sslContextService.createContext();
        String username = context.getProperty(USERNAME).getValue();
        String password = context.getProperty(PASSWORD).getValue();
        Map<String, String> httpHeaders = context.getProperties().entrySet().stream().filter(e -> ((PropertyDescriptor)e.getKey()).getName().startsWith(REQUEST_HEADER_PREFIX)).collect(Collectors.toMap(map -> ((PropertyDescriptor)map.getKey()).getName().substring(REQUEST_HEADER_PREFIX.length()), Map.Entry::getValue));
        RestSchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, this.getLogger(), httpHeaders);
        int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
        long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
        this.client = new CachingSchemaRegistryClient(restClient, cacheSize, cacheExpiration);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        AuthenticationType authenticationType;
        PropertyValue authenticationTypeProperty;
        List<String> baseUrls;
        List insecure;
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet();
        if (sslContextSet && !(insecure = (baseUrls = this.getBaseURLs((PropertyContext)validationContext)).stream().filter(url -> !url.startsWith("https")).collect(Collectors.toList())).isEmpty()) {
            results.add(new ValidationResult.Builder().subject(SCHEMA_REGISTRY_URLS.getDisplayName()).input((String)insecure.get(0)).valid(false).explanation("When SSL Context is configured, all Schema Registry URL's must use HTTPS, not HTTP").build());
        }
        if ((authenticationTypeProperty = validationContext.getProperty(AUTHENTICATION_TYPE)).isSet() && AuthenticationType.BASIC.equals((Object)(authenticationType = AuthenticationType.valueOf(authenticationTypeProperty.getValue())))) {
            String password;
            String username = validationContext.getProperty(USERNAME).getValue();
            if (StringUtils.isBlank((CharSequence)username)) {
                results.add(new ValidationResult.Builder().subject(USERNAME.getDisplayName()).valid(false).explanation("Username is required for Basic Authentication").build());
            }
            if (StringUtils.isBlank((CharSequence)(password = validationContext.getProperty(PASSWORD).getValue()))) {
                results.add(new ValidationResult.Builder().subject(PASSWORD.getDisplayName()).valid(false).explanation("Password is required for Basic Authentication").build());
            }
        }
        return results;
    }

    private List<String> getBaseURLs(PropertyContext context) {
        String urls = context.getProperty(SCHEMA_REGISTRY_URLS).evaluateAttributeExpressions().getValue();
        List<String> baseUrls = Stream.of(urls.split(",")).map(url -> url.trim()).collect(Collectors.toList());
        return baseUrls;
    }

    private RecordSchema retrieveSchemaByName(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
        Optional schemaName = schemaIdentifier.getName();
        if (!schemaName.isPresent()) {
            throw new SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
        }
        RecordSchema schema = schemaIdentifier.getVersion().isPresent() ? this.client.getSchema((String)schemaName.get(), schemaIdentifier.getVersion().getAsInt()) : this.client.getSchema((String)schemaName.get());
        return schema;
    }

    private RecordSchema retrieveSchemaById(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
        OptionalLong schemaId = schemaIdentifier.getIdentifier();
        if (!schemaId.isPresent()) {
            throw new SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
        }
        RecordSchema schema = this.client.getSchema((int)schemaId.getAsLong());
        return schema;
    }

    public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
        if (schemaIdentifier.getName().isPresent()) {
            return this.retrieveSchemaByName(schemaIdentifier);
        }
        return this.retrieveSchemaById(schemaIdentifier);
    }

    public Set<SchemaField> getSuppliedSchemaFields() {
        return schemaFields;
    }
}

