/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList;
import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.confluent.kafka.schemaregistry.exceptions.IdDoesNotMatchException;
import io.confluent.kafka.schemaregistry.exceptions.IdGenerationException;
import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.OperationNotPermittedException;
import io.confluent.kafka.schemaregistry.exceptions.ReferenceExistsException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaVersionNotSoftDeletedException;
import io.confluent.kafka.schemaregistry.exceptions.SubjectNotSoftDeletedException;
import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException;
import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.id.IncrementalIdGenerator;
import io.confluent.kafka.schemaregistry.id.ZookeeperIdGenerator;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector;
import io.confluent.kafka.schemaregistry.leaderelector.zookeeper.ZookeeperLeaderElector;
import io.confluent.kafka.schemaregistry.metrics.MetricsContainer;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectKey;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.CompositeSchemaUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.ConfigKey;
import io.confluent.kafka.schemaregistry.storage.ConfigValue;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectKey;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.InMemoryCache;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreMessageHandler;
import io.confluent.kafka.schemaregistry.storage.LeaderAwareSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LeaderElector;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.Mode;
import io.confluent.kafka.schemaregistry.storage.ModeKey;
import io.confluent.kafka.schemaregistry.storage.ModeValue;
import io.confluent.kafka.schemaregistry.storage.NoopKey;
import io.confluent.kafka.schemaregistry.storage.SchemaIdAndSubjects;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaReference;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.SubjectKey;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.rest.Application;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.net.ssl.HostnameVerifier;
import org.I0Itec.zkclient.ZkClient;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchemaRegistry
implements SchemaRegistry,
LeaderAwareSchemaRegistry {
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
    private static final long DESCRIBE_CLUSTER_TIMEOUT_MS = 10000L;
    private final SchemaRegistryConfig config;
    private final Map<String, Object> props;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
    private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
    private final SchemaRegistryIdentity myIdentity;
    private final CompatibilityLevel defaultCompatibilityLevel;
    private final Mode defaultMode;
    private final int kafkaStoreTimeoutMs;
    private final int initTimeout;
    private final int kafkaStoreMaxRetries;
    private final boolean isEligibleForLeaderElector;
    private final boolean allowModeChanges;
    private SchemaRegistryIdentity leaderIdentity;
    private RestService leaderRestService;
    private SslFactory sslFactory;
    private IdGenerator idGenerator = null;
    private LeaderElector leaderElector = null;
    private final MetricsContainer metricsContainer;
    private final Map<String, SchemaProvider> providers;
    private final String kafkaClusterId;

    public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer) throws SchemaRegistryException {
        if (config == null) {
            throw new SchemaRegistryException("Schema registry configuration is null");
        }
        this.config = config;
        this.props = new HashMap<String, Object>();
        Boolean leaderEligibility = config.getBoolean("master.eligibility");
        if (leaderEligibility == null) {
            leaderEligibility = config.getBoolean("leader.eligibility");
        }
        this.isEligibleForLeaderElector = leaderEligibility;
        String host = config.getString("host.name");
        SchemeAndPort schemeAndPort = KafkaSchemaRegistry.getSchemeAndPortForIdentity(config.getInt("port"), config.getList("listeners"), config.interInstanceProtocol());
        this.allowModeChanges = config.getBoolean("mode.mutability");
        this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port, this.isEligibleForLeaderElector, schemeAndPort.scheme);
        this.sslFactory = new SslFactory(ConfigDef.convertToStringMapWithPasswordValues((Map)config.values()));
        this.kafkaStoreTimeoutMs = config.getInt("kafkastore.timeout.ms");
        this.initTimeout = config.getInt("kafkastore.init.timeout.ms");
        this.kafkaStoreMaxRetries = config.getInt("kafkastore.write.max.retries");
        this.serializer = serializer;
        this.defaultCompatibilityLevel = config.compatibilityType();
        this.defaultMode = Mode.READWRITE;
        this.kafkaClusterId = this.kafkaClusterId(config);
        this.metricsContainer = new MetricsContainer(config, this.kafkaClusterId);
        this.providers = this.initProviders(config);
        this.lookupCache = this.lookupCache();
        this.idGenerator = this.identityGenerator(config);
        this.kafkaStore = this.kafkaStore(config);
    }

    private Map<String, SchemaProvider> initProviders(SchemaRegistryConfig config) {
        Map schemaProviderConfigs = config.originalsWithPrefix("schema.providers.");
        schemaProviderConfigs.put("schemaVersionFetcher", this);
        List<SchemaProvider> defaultSchemaProviders = Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider());
        for (SchemaProvider provider : defaultSchemaProviders) {
            provider.configure(schemaProviderConfigs);
        }
        HashMap<String, SchemaProvider> providerMap = new HashMap<String, SchemaProvider>();
        this.registerProviders(providerMap, defaultSchemaProviders);
        List customSchemaProviders = config.getConfiguredInstances("schema.providers", SchemaProvider.class, schemaProviderConfigs);
        this.registerProviders(providerMap, customSchemaProviders);
        this.metricsContainer.getCustomSchemaProviderCount().set(customSchemaProviders.size());
        return providerMap;
    }

    private void registerProviders(Map<String, SchemaProvider> providerMap, List<SchemaProvider> schemaProviders) {
        for (SchemaProvider schemaProvider : schemaProviders) {
            log.info("Registering schema provider for {}: {}", (Object)schemaProvider.schemaType(), (Object)schemaProvider.getClass().getName());
            providerMap.put(schemaProvider.schemaType(), schemaProvider);
        }
    }

    protected KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore(SchemaRegistryConfig config) throws SchemaRegistryException {
        Map handlerConfigs = config.originalsWithPrefix("kafkastore.update.handlers.");
        handlerConfigs.put("schemaRegistry", this);
        List customSchemaHandlers = config.getConfiguredInstances("kafkastore.update.handlers", SchemaUpdateHandler.class, handlerConfigs);
        KafkaStoreMessageHandler storeHandler = new KafkaStoreMessageHandler(this, this.lookupCache, this.idGenerator);
        customSchemaHandlers.add(storeHandler);
        return new KafkaStore<SchemaRegistryKey, SchemaRegistryValue>(config, new CompositeSchemaUpdateHandler(customSchemaHandlers), this.serializer, this.lookupCache, new NoopKey());
    }

    protected LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache() {
        return new InMemoryCache<SchemaRegistryKey, SchemaRegistryValue>();
    }

    public LookupCache<SchemaRegistryKey, SchemaRegistryValue> getLookupCache() {
        return this.lookupCache;
    }

    public Serializer<SchemaRegistryKey, SchemaRegistryValue> getSerializer() {
        return this.serializer;
    }

    protected IdGenerator identityGenerator(SchemaRegistryConfig config) {
        IdGenerator idGenerator = config.useKafkaCoordination() ? new IncrementalIdGenerator() : new ZookeeperIdGenerator();
        idGenerator.configure(config);
        return idGenerator;
    }

    public IdGenerator getIdentityGenerator() {
        return this.idGenerator;
    }

    public MetricsContainer getMetricsContainer() {
        return this.metricsContainer;
    }

    static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners, String requestedScheme) throws SchemaRegistryException {
        List listeners = Application.parseListeners(configuredListeners, (int)port, Arrays.asList("http", "https"), (String)"http");
        if (requestedScheme.isEmpty()) {
            requestedScheme = "http";
        }
        for (URI listener : listeners) {
            if (!requestedScheme.equalsIgnoreCase(listener.getScheme())) continue;
            return new SchemeAndPort(listener.getScheme(), listener.getPort());
        }
        throw new SchemaRegistryException(" No listener configured with requested scheme " + requestedScheme);
    }

    @Override
    public void init() throws SchemaRegistryException {
        try {
            this.kafkaStore.init();
        }
        catch (StoreInitializationException e) {
            throw new SchemaRegistryInitializationException("Error initializing kafka store while initializing schema registry", e);
        }
        try {
            if (this.config.useKafkaCoordination()) {
                log.info("Joining schema registry with Kafka-based coordination");
                this.leaderElector = new KafkaGroupLeaderElector(this.config, this.myIdentity, this);
            } else {
                log.info("Joining schema registry with Zookeeper-based coordination");
                log.warn("*****************************************************************************");
                log.warn("Zookeeper-based coordination is deprecated and will be removed in the future.");
                log.warn("Please switch to Kafka-based coordination with \"kafkastore.bootstrap.servers\".");
                log.warn("*****************************************************************************");
                this.leaderElector = new ZookeeperLeaderElector(this.config, this.myIdentity, this);
            }
            this.saveUrlInZk();
            this.leaderElector.init();
        }
        catch (SchemaRegistryStoreException e) {
            throw new SchemaRegistryInitializationException("Error electing leader while initializing schema registry", e);
        }
        catch (SchemaRegistryTimeoutException e) {
            throw new SchemaRegistryInitializationException(e);
        }
    }

    private void saveUrlInZk() throws SchemaRegistryException {
        String path;
        ZkClient zkClient = this.config.zkUtils().zkClient();
        if (!zkClient.exists("/sr_urls")) {
            zkClient.createPersistent("/sr_urls");
        }
        if (zkClient.exists(path = String.format("%s/%s_%d", "/sr_urls", this.myIdentity.getHost(), this.myIdentity.getPort()))) {
            String errMsg = String.format("%s:%d Address already in use", this.myIdentity.getHost(), this.myIdentity.getPort());
            throw new SchemaRegistryInitializationException(errMsg);
        }
        zkClient.createEphemeral(path, (Object)this.myIdentity.getUrl());
    }

    public boolean initialized() {
        return this.kafkaStore.initialized();
    }

    public boolean isLeader() {
        this.kafkaStore.leaderLock().lock();
        try {
            if (this.leaderIdentity != null && this.leaderIdentity.equals(this.myIdentity)) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.kafkaStore.leaderLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setLeader(@Nullable SchemaRegistryIdentity newLeader) throws SchemaRegistryTimeoutException, SchemaRegistryStoreException, IdGenerationException {
        log.debug("Setting the leader to " + newLeader);
        if (newLeader != null && !newLeader.getLeaderEligibility()) {
            throw new IllegalStateException("Tried to set an ineligible node to leader: " + newLeader);
        }
        this.kafkaStore.leaderLock().lock();
        try {
            SchemaRegistryIdentity previousLeader = this.leaderIdentity;
            this.leaderIdentity = newLeader;
            if (this.leaderIdentity == null) {
                this.leaderRestService = null;
            } else {
                this.leaderRestService = new RestService(this.leaderIdentity.getUrl());
                if (this.sslFactory != null && this.sslFactory.sslContext() != null) {
                    this.leaderRestService.setSslSocketFactory(this.sslFactory.sslContext().getSocketFactory());
                    this.leaderRestService.setHostnameVerifier(this.getHostnameVerifier());
                }
            }
            if (this.leaderIdentity != null && !this.leaderIdentity.equals(previousLeader) && this.isLeader()) {
                this.kafkaStore.markLastWrittenOffsetInvalid();
                try {
                    this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
                }
                catch (StoreException e) {
                    throw new SchemaRegistryStoreException("Exception getting latest offset ", e);
                }
                this.idGenerator.init();
            }
            this.metricsContainer.getLeaderNode().set(this.isLeader() ? 1L : 0L);
        }
        finally {
            this.kafkaStore.leaderLock().unlock();
        }
    }

    public SchemaRegistryIdentity myIdentity() {
        return this.myIdentity;
    }

    public SchemaRegistryIdentity leaderIdentity() {
        this.kafkaStore.leaderLock().lock();
        try {
            SchemaRegistryIdentity schemaRegistryIdentity = this.leaderIdentity;
            return schemaRegistryIdentity;
        }
        finally {
            this.kafkaStore.leaderLock().unlock();
        }
    }

    @Override
    public Set<String> schemaTypes() {
        return this.providers.keySet();
    }

    @Override
    public int register(String subject, Schema schema) throws SchemaRegistryException {
        try {
            this.checkRegisterMode(subject, schema);
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            ParsedSchema parsedSchema = this.canonicalizeSchema(schema);
            int schemaId = schema.getId();
            SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
            if (schemaIdAndSubjects != null) {
                if (schemaId >= 0 && schemaId != schemaIdAndSubjects.getSchemaId()) {
                    throw new IdDoesNotMatchException(schemaIdAndSubjects.getSchemaId(), schema.getId());
                }
                if (schemaIdAndSubjects.hasSubject(subject) && !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject))) {
                    return schemaIdAndSubjects.getSchemaId();
                }
                schemaId = schemaIdAndSubjects.getSchemaId();
            }
            List<SchemaValue> allVersions = this.getAllSchemaValues(subject);
            Collections.reverse(allVersions);
            ArrayList<SchemaValue> deletedVersions = new ArrayList<SchemaValue>();
            ArrayList<ParsedSchema> undeletedVersions = new ArrayList<ParsedSchema>();
            int newVersion = 1;
            for (SchemaValue schemaValue : allVersions) {
                newVersion = Math.max(newVersion, schemaValue.getVersion() + 1);
                if (schemaValue.isDeleted()) {
                    deletedVersions.add(schemaValue);
                    continue;
                }
                ParsedSchema undeletedSchema = this.parseSchema(this.getSchemaEntityFromSchemaValue(schemaValue));
                if (parsedSchema.references().isEmpty() && !undeletedSchema.references().isEmpty() && parsedSchema.deepEquals(undeletedSchema)) {
                    return schemaValue.getId();
                }
                undeletedVersions.add(undeletedSchema);
            }
            Collections.reverse(undeletedVersions);
            boolean isCompatible = this.isCompatibleWithPrevious(subject, parsedSchema, undeletedVersions);
            schema.setSchema(parsedSchema.canonicalString());
            schema.setReferences(parsedSchema.references());
            if (isCompatible) {
                if (schema.getVersion() <= 0) {
                    schema.setVersion(Integer.valueOf(newVersion));
                }
                SchemaKey schemaKey = new SchemaKey(subject, schema.getVersion());
                if (schemaId >= 0) {
                    schema.setId(Integer.valueOf(schemaId));
                    this.kafkaStore.put(schemaKey, new SchemaValue(schema));
                } else {
                    int retries = 0;
                    while (retries++ < this.kafkaStoreMaxRetries) {
                        int newId = this.idGenerator.id(schema);
                        if (this.lookupCache.schemaKeyById(newId) != null) continue;
                        schema.setId(Integer.valueOf(newId));
                        if (retries > 1) {
                            log.warn(String.format("Retrying to register the schema with ID %s", newId));
                        }
                        this.kafkaStore.put(schemaKey, new SchemaValue(schema));
                        break;
                    }
                    if (retries >= this.kafkaStoreMaxRetries) {
                        throw new SchemaRegistryStoreException("Error while registering the schema due to generating an ID that is already in use.");
                    }
                }
                for (SchemaValue schemaValue : deletedVersions) {
                    if (!schemaValue.getId().equals(schema.getId())) continue;
                    SchemaKey key = new SchemaKey(schemaValue.getSubject(), schemaValue.getVersion());
                    this.kafkaStore.delete(key);
                }
                return schema.getId();
            }
            throw new IncompatibleSchemaException("New schema is incompatible with an earlier schema.");
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while registering the schema in the backend Kafka store", e);
        }
    }

    private void checkRegisterMode(String subject, Schema schema) throws OperationNotPermittedException, SchemaRegistryStoreException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        if (schema.getId() >= 0 || schema.getVersion() > 0) {
            if (this.getModeInScope(subject) != Mode.IMPORT) {
                throw new OperationNotPermittedException("Subject " + subject + " is not in import mode");
            }
        } else if (this.getModeInScope(subject) != Mode.READWRITE) {
            throw new OperationNotPermittedException("Subject " + subject + " is not in read-write mode");
        }
    }

    public int registerOrForward(String subject, Schema schema, Map<String, String> headerProperties) throws SchemaRegistryException {
        Schema existingSchema = this.lookUpSchemaUnderSubject(subject, schema, false);
        if (existingSchema != null) {
            if (schema.getId() != null && schema.getId() >= 0 && !schema.getId().equals(existingSchema.getId())) {
                throw new IdDoesNotMatchException(existingSchema.getId(), schema.getId());
            }
            return existingSchema.getId();
        }
        this.kafkaStore.lockFor(subject).lock();
        try {
            if (this.isLeader()) {
                int n = this.register(subject, schema);
                return n;
            }
            if (this.leaderIdentity != null) {
                int n = this.forwardRegisterRequestToLeader(subject, schema, headerProperties);
                return n;
            }
            throw new UnknownLeaderException("Register schema request failed since leader is unknown");
        }
        finally {
            this.kafkaStore.lockFor(subject).unlock();
        }
    }

    @Override
    public void deleteSchemaVersion(String subject, Schema schema, boolean permanentDelete) throws SchemaRegistryException {
        try {
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            SchemaKey key = new SchemaKey(subject, schema.getVersion());
            if (!this.lookupCache.referencesSchema(key).isEmpty()) {
                throw new ReferenceExistsException(key.toString());
            }
            SchemaValue schemaValue = (SchemaValue)this.lookupCache.get(key);
            if (permanentDelete && !schemaValue.isDeleted()) {
                throw new SchemaVersionNotSoftDeletedException(subject, schema.getVersion().toString());
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            if (!permanentDelete) {
                schemaValue = new SchemaValue(schema);
                schemaValue.setDeleted(true);
                this.kafkaStore.put(key, schemaValue);
                if (!this.getAllVersions(subject, false).hasNext()) {
                    if (this.getMode(subject) != null) {
                        this.deleteMode(subject);
                    }
                    if (this.getCompatibilityLevel(subject) != null) {
                        this.deleteSubjectCompatibility(subject);
                    }
                }
            } else {
                this.kafkaStore.put(key, null);
            }
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the schema for subject '" + subject + "' in the backend Kafka store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteSchemaVersionOrForward(Map<String, String> headerProperties, String subject, Schema schema, boolean permanentDelete) throws SchemaRegistryException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isLeader()) {
                    this.deleteSchemaVersion(subject, schema, permanentDelete);
                    break block5;
                }
                if (this.leaderIdentity != null) {
                    this.forwardDeleteSchemaVersionRequestToLeader(headerProperties, subject, schema.getVersion(), permanentDelete);
                    break block5;
                }
                throw new UnknownLeaderException("Register schema request failed since leader is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    @Override
    public List<Integer> deleteSubject(String subject, boolean permanentDelete) throws SchemaRegistryException {
        try {
            SubjectKey key;
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            ArrayList<Integer> deletedVersions = new ArrayList<Integer>();
            int deleteWatermarkVersion = 0;
            Iterator<Schema> schemasToBeDeleted = this.getAllVersions(subject, permanentDelete);
            while (schemasToBeDeleted.hasNext()) {
                SchemaValue schemaValue;
                deleteWatermarkVersion = schemasToBeDeleted.next().getVersion();
                key = new SchemaKey(subject, deleteWatermarkVersion);
                if (!this.lookupCache.referencesSchema((SchemaKey)key).isEmpty()) {
                    throw new ReferenceExistsException(((SchemaKey)key).toString());
                }
                if (permanentDelete && !(schemaValue = (SchemaValue)this.lookupCache.get(key)).isDeleted()) {
                    throw new SubjectNotSoftDeletedException(subject);
                }
                deletedVersions.add(deleteWatermarkVersion);
            }
            if (!permanentDelete) {
                key = new DeleteSubjectKey(subject);
                DeleteSubjectValue value = new DeleteSubjectValue(subject, deleteWatermarkVersion);
                this.kafkaStore.put(key, value);
                if (this.getMode(subject) != null) {
                    this.deleteMode(subject);
                }
                if (this.getCompatibilityLevel(subject) != null) {
                    this.deleteSubjectCompatibility(subject);
                }
            } else {
                for (Integer version : deletedVersions) {
                    this.kafkaStore.put(new SchemaKey(subject, version), null);
                }
            }
            return deletedVersions;
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the subject in the backend Kafka store", e);
        }
    }

    public List<Integer> deleteSubjectOrForward(Map<String, String> requestProperties, String subject, boolean permanentDelete) throws SchemaRegistryException {
        this.kafkaStore.lockFor(subject).lock();
        try {
            if (this.isLeader()) {
                List<Integer> list = this.deleteSubject(subject, permanentDelete);
                return list;
            }
            if (this.leaderIdentity != null) {
                List<Integer> list = this.forwardDeleteSubjectRequestToLeader(requestProperties, subject, permanentDelete);
                return list;
            }
            throw new UnknownLeaderException("Register schema request failed since leader is unknown");
        }
        finally {
            this.kafkaStore.lockFor(subject).unlock();
        }
    }

    @Override
    public Schema lookUpSchemaUnderSubject(String subject, Schema schema, boolean lookupDeletedSchema) throws SchemaRegistryException {
        ParsedSchema parsedSchema = this.canonicalizeSchema(schema);
        SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
        if (schemaIdAndSubjects != null && schemaIdAndSubjects.hasSubject(subject) && (lookupDeletedSchema || !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject)))) {
            Schema matchingSchema = new Schema(subject, Integer.valueOf(schemaIdAndSubjects.getVersion(subject)), Integer.valueOf(schemaIdAndSubjects.getSchemaId()), schema.getSchemaType(), schema.getReferences(), schema.getSchema());
            return matchingSchema;
        }
        List<SchemaValue> allVersions = this.getAllSchemaValues(subject);
        Collections.reverse(allVersions);
        for (SchemaValue schemaValue : allVersions) {
            Schema undeleted;
            ParsedSchema undeletedSchema;
            if (schemaValue.isDeleted() || !parsedSchema.references().isEmpty() || schemaValue.getReferences().isEmpty() || !parsedSchema.deepEquals(undeletedSchema = this.parseSchema(undeleted = this.getSchemaEntityFromSchemaValue(schemaValue)))) continue;
            return undeleted;
        }
        return null;
    }

    private int forwardRegisterRequestToLeader(String subject, Schema schema, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.leaderRestService.getBaseUrls();
        RegisterSchemaRequest registerSchemaRequest = new RegisterSchemaRequest();
        registerSchemaRequest.setSchema(schema.getSchema());
        registerSchemaRequest.setSchemaType(schema.getSchemaType());
        registerSchemaRequest.setReferences(schema.getReferences());
        registerSchemaRequest.setVersion(schema.getVersion());
        registerSchemaRequest.setId(schema.getId());
        log.debug(String.format("Forwarding registering schema request %s to %s", registerSchemaRequest, baseUrl));
        try {
            int id = this.leaderRestService.registerSchema(headerProperties, registerSchemaRequest, subject);
            return id;
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the registering schema request %s to %s", registerSchemaRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardUpdateCompatibilityLevelRequestToLeader(String subject, CompatibilityLevel compatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.leaderRestService.getBaseUrls();
        ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest();
        configUpdateRequest.setCompatibilityLevel(compatibilityLevel.name);
        log.debug(String.format("Forwarding update config request %s to %s", configUpdateRequest, baseUrl));
        try {
            this.leaderRestService.updateConfig(headerProperties, configUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update config request %s to %s", configUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardDeleteSchemaVersionRequestToLeader(Map<String, String> headerProperties, String subject, Integer version, boolean permanentDelete) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.leaderRestService.getBaseUrls();
        log.debug(String.format("Forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl));
        try {
            this.leaderRestService.deleteSchemaVersion(headerProperties, subject, String.valueOf(version), permanentDelete);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private List<Integer> forwardDeleteSubjectRequestToLeader(Map<String, String> requestProperties, String subject, boolean permanentDelete) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.leaderRestService.getBaseUrls();
        log.debug(String.format("Forwarding delete subject request for  %s to %s", subject, baseUrl));
        try {
            return this.leaderRestService.deleteSubject(requestProperties, subject, permanentDelete);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding delete subject request %s to %s", subject, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardSetModeRequestToLeader(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.leaderRestService.getBaseUrls();
        ModeUpdateRequest modeUpdateRequest = new ModeUpdateRequest();
        modeUpdateRequest.setMode(mode.name());
        log.debug(String.format("Forwarding update mode request %s to %s", modeUpdateRequest, baseUrl));
        try {
            this.leaderRestService.setMode(headerProperties, modeUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update mode request %s to %s", modeUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private ParsedSchema canonicalizeSchema(Schema schema) throws InvalidSchemaException {
        if (schema == null || schema.getSchema() == null || schema.getSchema().trim().isEmpty()) {
            log.error("Empty schema");
            throw new InvalidSchemaException("Empty schema");
        }
        ParsedSchema parsedSchema = this.parseSchema(schema);
        try {
            parsedSchema.validate();
        }
        catch (Exception e) {
            String errMsg = "Invalid schema " + schema;
            log.error(errMsg, (Throwable)e);
            throw new InvalidSchemaException(errMsg, e);
        }
        schema.setSchema(parsedSchema.canonicalString());
        schema.setReferences(parsedSchema.references());
        return parsedSchema;
    }

    private ParsedSchema parseSchema(Schema schema) throws InvalidSchemaException {
        return this.parseSchema(schema.getSchemaType(), schema.getSchema(), schema.getReferences());
    }

    public ParsedSchema parseSchema(String schemaType, String schema, List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> references) throws InvalidSchemaException {
        SchemaProvider provider;
        if (schemaType == null) {
            schemaType = "AVRO";
        }
        if ((provider = this.providers.get(schemaType)) == null) {
            String errMsg = "Invalid schema type " + schemaType;
            log.error(errMsg);
            throw new InvalidSchemaException(errMsg);
        }
        String type = schemaType;
        ParsedSchema parsedSchema = (ParsedSchema)provider.parseSchema(schema, references).orElseThrow(() -> new InvalidSchemaException("Invalid schema " + schema + " with refs " + references + " of type " + type));
        return parsedSchema;
    }

    public Schema validateAndGetSchema(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        Schema schema = this.get(subject, version, returnDeletedSchema);
        if (schema == null) {
            if (!this.hasSubjects(subject, returnDeletedSchema)) {
                throw Errors.subjectNotFoundException(subject);
            }
            throw Errors.versionNotFoundException(version);
        }
        return schema;
    }

    public boolean schemaVersionExists(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        Schema schema = this.get(subject, version, returnDeletedSchema);
        return schema != null;
    }

    @Override
    public Schema get(String subject, int version, boolean returnDeletedSchema) throws SchemaRegistryException {
        VersionId versionId = new VersionId(version);
        if (versionId.isLatest()) {
            return this.getLatestVersion(subject);
        }
        SchemaKey key = new SchemaKey(subject, version);
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(key);
            Schema schema = null;
            if (schemaValue != null && !schemaValue.isDeleted() || returnDeletedSchema) {
                schema = this.getSchemaEntityFromSchemaValue(schemaValue);
            }
            return schema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaString get(int id) throws SchemaRegistryException {
        return this.get(id, null, false);
    }

    public SchemaString get(int id, String format, boolean fetchMaxId) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e);
        }
        SchemaString schemaString = new SchemaString();
        schemaString.setSchemaType(schema.getSchemaType());
        List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> refs = schema.getReferences() != null ? schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()) : null;
        schemaString.setReferences(refs);
        if (format != null && !format.trim().isEmpty()) {
            ParsedSchema parsedSchema = this.parseSchema(schema.getSchemaType(), schema.getSchema(), refs);
            schemaString.setSchemaString(parsedSchema.formattedString(format));
        } else {
            schemaString.setSchemaString(schema.getSchema());
        }
        if (fetchMaxId) {
            schemaString.setMaxId(Integer.valueOf(this.idGenerator.getMaxId(id)));
        }
        return schemaString;
    }

    public List<Integer> getReferencedBy(String subject, VersionId versionId) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        if (versionId.isLatest()) {
            version = this.getLatestVersion(subject).getVersion();
        }
        SchemaKey key = new SchemaKey(subject, version);
        ArrayList<Integer> ids = new ArrayList<Integer>(this.lookupCache.referencesSchema(key));
        Collections.sort(ids);
        return ids;
    }

    @Override
    public Set<String> listSubjects(boolean returnDeletedSubjects) throws SchemaRegistryException {
        try {
            Iterator<SchemaRegistryKey> allKeys = this.kafkaStore.getAllKeys();
            return this.extractUniqueSubjects(allKeys, returnDeletedSubjects);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    public Set<String> listSubjectsForId(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e);
        }
        return this.lookupCache.schemaIdAndSubjects(new Schema(schema.getSubject(), schema.getVersion(), schema.getId(), schema.getSchemaType(), schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schema.getSchema())).allSubjects();
    }

    public List<SubjectVersion> listVersionsForId(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e2) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e2);
        }
        return this.lookupCache.schemaIdAndSubjects(new Schema(schema.getSubject(), schema.getVersion(), schema.getId(), schema.getSchemaType(), schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schema.getSchema())).allSubjectVersions().entrySet().stream().map(e -> new SubjectVersion((String)e.getKey(), (Integer)e.getValue())).collect(Collectors.toList());
    }

    private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys, boolean returnDeletedSubjects) throws StoreException {
        HashSet<String> subjects = new HashSet<String>();
        while (allKeys.hasNext()) {
            SchemaKey key;
            SchemaValue value;
            SchemaRegistryKey k = allKeys.next();
            if (!(k instanceof SchemaKey) || (value = (SchemaValue)this.kafkaStore.get(key = (SchemaKey)k)) == null || value.isDeleted() && !returnDeletedSubjects) continue;
            subjects.add(key.getSubject());
        }
        return subjects;
    }

    public boolean hasSubjects(String subject, boolean lookupDeletedSubjects) throws SchemaRegistryStoreException {
        try {
            return this.lookupCache.hasSubjects(subject, lookupDeletedSubjects);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public Iterator<Schema> getAllVersions(String subject, boolean returnDeletedSchemas) throws SchemaRegistryException {
        return this.sortSchemasByVersion(this.allVersions(subject, false), returnDeletedSchemas).iterator();
    }

    public Iterator<Schema> getAllVersionsWithPrefix(String prefix, boolean returnDeletedSchemas) throws SchemaRegistryException {
        return this.sortSchemasByVersion(this.allVersions(prefix, true), returnDeletedSchemas).iterator();
    }

    private List<SchemaValue> getAllSchemaValues(String subject) throws SchemaRegistryException {
        return this.sortSchemaValuesByVersion(this.allVersions(subject, false));
    }

    @Override
    public Schema getLatestVersion(String subject) throws SchemaRegistryException {
        List<Schema> sortedVersions = this.sortSchemasByVersion(this.allVersions(subject, false), false);
        return sortedVersions.size() > 0 ? sortedVersions.get(sortedVersions.size() - 1) : null;
    }

    private Iterator<SchemaRegistryValue> allVersions(String subjectOrPrefix, boolean isPrefix) throws SchemaRegistryException {
        try {
            String start = subjectOrPrefix;
            String end = isPrefix ? subjectOrPrefix + '\uffff' : subjectOrPrefix;
            SchemaKey key1 = new SchemaKey(start, 1);
            SchemaKey key2 = new SchemaKey(end, Integer.MAX_VALUE);
            return this.kafkaStore.getAll(key1, key2);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public void close() {
        log.info("Shutting down schema registry");
        this.kafkaStore.close();
        if (this.leaderElector != null) {
            this.leaderElector.close();
        }
    }

    public void updateCompatibilityLevel(String subject, CompatibilityLevel newCompatibilityLevel) throws SchemaRegistryStoreException, OperationNotPermittedException, UnknownLeaderException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        ConfigKey configKey = new ConfigKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            this.kafkaStore.put(configKey, new ConfigValue(newCompatibilityLevel));
            log.debug("Wrote new compatibility level: " + newCompatibilityLevel.name + " to the Kafka data store with key " + configKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new config value to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateConfigOrForward(String subject, CompatibilityLevel newCompatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, UnknownLeaderException, OperationNotPermittedException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isLeader()) {
                    this.updateCompatibilityLevel(subject, newCompatibilityLevel);
                    break block5;
                }
                if (this.leaderIdentity != null) {
                    this.forwardUpdateCompatibilityLevelRequestToLeader(subject, newCompatibilityLevel, headerProperties);
                    break block5;
                }
                throw new UnknownLeaderException("Update config request failed since leader is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    private String kafkaClusterId(SchemaRegistryConfig config) throws SchemaRegistryException {
        String string;
        block8: {
            Properties adminClientProps = new Properties();
            KafkaStore.addSchemaRegistryConfigsToClientProperties(config, adminClientProps);
            AdminClient adminClient = AdminClient.create((Properties)adminClientProps);
            try {
                string = (String)adminClient.describeCluster().clusterId().get(10000L, TimeUnit.MILLISECONDS);
                if (adminClient == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (adminClient != null) {
                        try {
                            adminClient.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    throw new SchemaRegistryException("Failed to get Kafka cluster ID", e);
                }
            }
            adminClient.close();
        }
        return string;
    }

    public String getKafkaClusterId() {
        return this.kafkaClusterId;
    }

    public CompatibilityLevel getCompatibilityLevel(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, false, this.defaultCompatibilityLevel);
    }

    public CompatibilityLevel getCompatibilityLevelInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, true, this.defaultCompatibilityLevel);
    }

    @Override
    public boolean isCompatible(String subject, Schema newSchema, Schema latestSchema) throws SchemaRegistryException {
        if (latestSchema == null) {
            log.error("Lastest schema not provided");
            throw new InvalidSchemaException("Latest schema not provided");
        }
        return this.isCompatible(subject, newSchema, Collections.singletonList(latestSchema));
    }

    @Override
    public boolean isCompatible(String subject, Schema newSchema, List<Schema> previousSchemas) throws SchemaRegistryException {
        if (previousSchemas == null) {
            log.error("Previous schema not provided");
            throw new InvalidSchemaException("Previous schema not provided");
        }
        ArrayList<ParsedSchema> prevParsedSchemas = new ArrayList<ParsedSchema>(previousSchemas.size());
        for (Schema previousSchema : previousSchemas) {
            ParsedSchema prevParsedSchema = this.parseSchema(previousSchema);
            prevParsedSchemas.add(prevParsedSchema);
        }
        return this.isCompatibleWithPrevious(subject, this.parseSchema(newSchema), prevParsedSchemas);
    }

    private boolean isCompatibleWithPrevious(String subject, ParsedSchema parsedSchema, List<ParsedSchema> previousSchemas) throws SchemaRegistryException {
        CompatibilityLevel compatibility = this.getCompatibilityLevelInScope(subject);
        return parsedSchema.isCompatible(compatibility, previousSchemas);
    }

    private void deleteMode(String subject) throws StoreException {
        ModeKey modeKey = new ModeKey(subject);
        this.kafkaStore.delete(modeKey);
    }

    private void deleteSubjectCompatibility(String subject) throws StoreException {
        ConfigKey configKey = new ConfigKey(subject);
        this.kafkaStore.delete(configKey);
    }

    public Mode getMode(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, false, this.defaultMode);
    }

    private Mode getModeInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, true, this.defaultMode);
    }

    public void setMode(String subject, Mode mode) throws SchemaRegistryStoreException, OperationNotPermittedException {
        if (!this.allowModeChanges) {
            throw new OperationNotPermittedException("Mode changes are not allowed");
        }
        ModeKey modeKey = new ModeKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            if (mode == Mode.IMPORT && this.getMode(subject) != Mode.IMPORT) {
                if (this.hasSubjects(subject, false)) {
                    throw new OperationNotPermittedException("Cannot import since found existing subjects");
                }
                this.kafkaStore.put(new ClearSubjectKey(subject), new ClearSubjectValue(subject));
            }
            this.kafkaStore.put(modeKey, new ModeValue(mode));
            log.debug("Wrote new mode: " + mode.name() + " to the Kafka data store with key " + modeKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new mode to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setModeOrForward(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, OperationNotPermittedException, UnknownLeaderException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isLeader()) {
                    this.setMode(subject, mode);
                    break block5;
                }
                if (this.leaderIdentity != null) {
                    this.forwardSetModeRequestToLeader(subject, mode, headerProperties);
                    break block5;
                }
                throw new UnknownLeaderException("Update mode request failed since leader is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
        return this.kafkaStore;
    }

    private List<Schema> sortSchemasByVersion(Iterator<SchemaRegistryValue> schemas, boolean returnDeletedSchemas) {
        ArrayList<Schema> schemaList = new ArrayList<Schema>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            if (!returnDeletedSchemas && schemaValue.isDeleted()) continue;
            schemaList.add(this.getSchemaEntityFromSchemaValue(schemaValue));
        }
        Collections.sort(schemaList);
        return schemaList;
    }

    private List<SchemaValue> sortSchemaValuesByVersion(Iterator<SchemaRegistryValue> schemas) {
        ArrayList<SchemaValue> schemaList = new ArrayList<SchemaValue>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            schemaList.add(schemaValue);
        }
        Collections.sort(schemaList);
        return schemaList;
    }

    private Schema getSchemaEntityFromSchemaValue(SchemaValue schemaValue) {
        if (schemaValue == null) {
            return null;
        }
        List<SchemaReference> refs = schemaValue.getReferences();
        return new Schema(schemaValue.getSubject(), schemaValue.getVersion(), schemaValue.getId(), schemaValue.getSchemaType(), refs == null ? null : refs.stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schemaValue.getSchema());
    }

    private boolean isSubjectVersionDeleted(String subject, int version) throws SchemaRegistryException {
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(new SchemaKey(subject, version));
            return schemaValue == null || schemaValue.isDeleted();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaRegistryConfig config() {
        return this.config;
    }

    @Override
    public Map<String, Object> properties() {
        return this.props;
    }

    public HostnameVerifier getHostnameVerifier() throws SchemaRegistryStoreException {
        String sslEndpointIdentificationAlgo = this.config.getString("ssl.endpoint.identification.algorithm");
        if (sslEndpointIdentificationAlgo == null || sslEndpointIdentificationAlgo.equals("none") || sslEndpointIdentificationAlgo.isEmpty()) {
            return (hostname, session) -> true;
        }
        if (sslEndpointIdentificationAlgo.equalsIgnoreCase("https")) {
            return null;
        }
        throw new SchemaRegistryStoreException("ssl.endpoint.identification.algorithm " + sslEndpointIdentificationAlgo + " not supported");
    }

    public static class SchemeAndPort {
        public int port;
        public String scheme;

        public SchemeAndPort(String scheme, int port) {
            this.port = port;
            this.scheme = scheme;
        }
    }
}

