/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.common.config.ConfigDef;
import io.confluent.kafkarest.SystemTime;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.Versions;
import io.confluent.rest.RestConfig;
import io.confluent.rest.RestConfigException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class KafkaRestConfig
extends RestConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaRestConfig.class);
    public static final String ID_CONFIG = "id";
    private static final String ID_CONFIG_DOC = "Unique ID for this REST server instance. This is used in generating unique IDs for consumers that do not specify their ID. The ID is empty by default, which makes a single server setup easier to get up and running, but is not safe for multi-server deployments where automatic consumer IDs are used.";
    public static final String ID_DEFAULT = "";
    public static final String HOST_NAME_CONFIG = "host.name";
    private static final String HOST_NAME_DOC = "The host name used to generate absolute URLs in responses. If empty, the default canonical hostname is used";
    public static final String HOST_NAME_DEFAULT = "";
    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
    private static final String ZOOKEEPER_CONNECT_DOC = "NOTE: Only required when using v1 Consumer API's. Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n\nThe server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path. ";
    public static final String ZOOKEEPER_CONNECT_DEFAULT = "";
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    private static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping\u2014this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).";
    public static final String BOOTSTRAP_SERVERS_DEFAULT = "";
    public static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
    private static final String SCHEMA_REGISTRY_URL_DOC = "The base URL for the schema registry that should be used by the Avro serializer.";
    private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
    public static final String PRODUCER_THREADS_CONFIG = "producer.threads";
    private static final String PRODUCER_THREADS_DOC = "Number of threads to run produce requests on.";
    public static final String PRODUCER_THREADS_DEFAULT = "5";
    public static final String CONSUMER_ITERATOR_TIMEOUT_MS_CONFIG = "consumer.iterator.timeout.ms";
    private static final String CONSUMER_ITERATOR_TIMEOUT_MS_DOC = "Timeout for blocking consumer iterator operations. This should be set to a small enough value that it is possible to effectively peek() on the iterator.";
    public static final String CONSUMER_ITERATOR_TIMEOUT_MS_DEFAULT = "1";
    public static final String CONSUMER_ITERATOR_BACKOFF_MS_CONFIG = "consumer.iterator.backoff.ms";
    private static final String CONSUMER_ITERATOR_BACKOFF_MS_DOC = "Amount of time to backoff when an iterator runs out of data. If a consumer has a dedicated worker thread, this is effectively the maximum error for the entire request timeout. It should be small enough to closely target the timeout, but large enough to avoid busy waiting.";
    public static final String CONSUMER_ITERATOR_BACKOFF_MS_DEFAULT = "50";
    public static final String CONSUMER_REQUEST_TIMEOUT_MS_CONFIG = "consumer.request.timeout.ms";
    private static final String CONSUMER_REQUEST_TIMEOUT_MS_DOC = "The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached.";
    public static final String CONSUMER_REQUEST_TIMEOUT_MS_DEFAULT = "1";
    public static final String CONSUMER_REQUEST_MAX_BYTES_CONFIG = "consumer.request.max.bytes";
    private static final String CONSUMER_REQUEST_MAX_BYTES_DOC = "Maximum number of bytes in unencoded message keys and values returned by a single request. This can be used by administrators to limit the memory used by a single consumer and to control the memory usage required to decode responses on clients that cannot perform a streaming decode. Note that the actual payload will be larger due to overhead from base64 encoding the response data and from JSON encoding the entire response.";
    public static final long CONSUMER_REQUEST_MAX_BYTES_DEFAULT = 0x4000000L;
    public static final String CONSUMER_THREADS_CONFIG = "consumer.threads";
    private static final String CONSUMER_THREADS_DOC = "Number of threads to run consumer requests on.";
    public static final String CONSUMER_THREADS_DEFAULT = "1";
    public static final String CONSUMER_INSTANCE_TIMEOUT_MS_CONFIG = "consumer.instance.timeout.ms";
    private static final String CONSUMER_INSTANCE_TIMEOUT_MS_DOC = "Amount of idle time before a consumer instance is automatically destroyed.";
    public static final String CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT = "300000";
    public static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_CONFIG = "simpleconsumer.pool.size.max";
    private static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_DOC = "Maximum number of SimpleConsumers that can be instantiated per broker. If 0, then the pool size is not limited.";
    public static final String SIMPLE_CONSUMER_MAX_POOL_SIZE_DEFAULT = "25";
    public static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_CONFIG = "simpleconsumer.pool.timeout.ms";
    private static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DOC = "Amount of time to wait for an available SimpleConsumer from the pool before failing. Use 0 for no timeout";
    public static final String SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DEFAULT = "1000";
    public static final String SIMPLE_CONSUMER_MAX_POLL_TIME_CONFIG = "simpleconsumer.max.poll.time";
    private static final String SIMPLE_CONSUMER_MAX_POLL_TIME_DOC = "Maximum amount of time to poll for records by a consumer.";
    public static final int SIMPLE_CONSUMER_MAX_POLL_TIME_DEFAULT = 1000;
    public static final String SIMPLE_CONSUMER_MAX_CACHES_NUM_CONFIG = "simpleconsumer.max.caches.num";
    private static final String SIMPLE_CONSUMER_MAX_CACHES_NUM_DOC = "Maximum number topic-partition combinations for which records are cached. If 0, then caching is disabled and extra records are thrown away. Cache improves performance if end user fetches records sequentially increasing offsets.";
    public static final int SIMPLE_CONSUMER_MAX_CACHES_NUM_DEFAULT = 0;
    public static final String SIMPLE_CONSUMER_CACHE_MAX_RECORDS_CONFIG = "simpleconsumer.cache.max.records";
    private static final String SIMPLE_CONSUMER_CACHE_MAX_RECORDS_DOC = "Maximum number of records that can be stored for a specific topic-partition combination. Records with higher offsets replace records with lower ones Must be greater that 0.";
    public static final int SIMPLE_CONSUMER_CACHE_MAX_RECORDS_DEFAULT = 1000;
    public static final String STREAM_BUFFER_MAX_TIME_CONFIG = "producer.streams.buffer.max.time.ms";
    private static final String STREAM_BUFFER_MAX_TIME_DOC = "Messages are buffered in the producer for at most the specified time. A thread will flush all the messages that have been buffered for more than the time specified";
    public static final String STREAM_BUFFER_MAX_TIME_DEFAULT = "1";
    private static final String KAFKAREST_LISTENERS_DEFAULT = "";
    private static final int KAFKAREST_PORT_DEFAULT = 8082;
    private static final String METRICS_JMX_PREFIX_DEFAULT_OVERRIDE = "kafka.rest";
    public static final String KAFKACLIENT_ZK_SESSION_TIMEOUT_MS_CONFIG = "client.zk.session.timeout.ms";
    public static final String KAFKACLIENT_TIMEOUT_CONFIG = "client.timeout.ms";
    public static final String KAFKACLIENT_INIT_TIMEOUT_CONFIG = "client.init.timeout.ms";
    public static final String ZOOKEEPER_SET_ACL_CONFIG = "zookeeper.set.acl";
    public static final String KAFKACLIENT_SECURITY_PROTOCOL_CONFIG = "client.security.protocol";
    public static final String KAFKACLIENT_SSL_TRUSTSTORE_LOCATION_CONFIG = "client.ssl.truststore.location";
    public static final String KAFKACLIENT_SSL_TRUSTSTORE_PASSWORD_CONFIG = "client.ssl.truststore.password";
    public static final String KAFKACLIENT_SSL_KEYSTORE_LOCATION_CONFIG = "client.ssl.keystore.location";
    public static final String KAFKACLIENT_SSL_TRUSTSTORE_TYPE_CONFIG = "client.ssl.truststore.type";
    public static final String KAFKACLIENT_SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "client.ssl.trustmanager.algorithm";
    public static final String KAFKACLIENT_SSL_KEYSTORE_PASSWORD_CONFIG = "client.ssl.keystore.password";
    public static final String KAFKACLIENT_SSL_KEYSTORE_TYPE_CONFIG = "client.ssl.keystore.type";
    public static final String KAFKACLIENT_SSL_KEYMANAGER_ALGORITHM_CONFIG = "client.ssl.keymanager.algorithm";
    public static final String KAFKACLIENT_SSL_KEY_PASSWORD_CONFIG = "client.ssl.key.password";
    public static final String KAFKACLIENT_SSL_ENABLED_PROTOCOLS_CONFIG = "client.ssl.enabled.protocols";
    public static final String KAFKACLIENT_SSL_PROTOCOL_CONFIG = "client.ssl.protocol";
    public static final String KAFKACLIENT_SSL_PROVIDER_CONFIG = "client.ssl.provider";
    public static final String KAFKACLIENT_SSL_CIPHER_SUITES_CONFIG = "client.ssl.cipher.suites";
    public static final String KAFKACLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "client.ssl.endpoint.identification.algorithm";
    public static final String KAFKACLIENT_SASL_KERBEROS_SERVICE_NAME_CONFIG = "client.sasl.kerberos.service.name";
    public static final String KAFKACLIENT_SASL_MECHANISM_CONFIG = "client.sasl.mechanism";
    public static final String KAFKACLIENT_SASL_KERBEROS_KINIT_CMD_CONFIG = "client.sasl.kerberos.kinit.cmd";
    public static final String KAFKACLIENT_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_CONFIG = "client.sasl.kerberos.min.time.before.relogin";
    public static final String KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_JITTER_CONFIG = "client.sasl.kerberos.ticket.renew.jitter";
    public static final String KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_CONFIG = "client.sasl.kerberos.ticket.renew.window.factor";
    public static final String KAFKA_REST_RESOURCE_EXTENSION_CONFIG = "kafka.rest.resource.extension.class";
    protected static final String KAFKACLIENT_ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout";
    protected static final String KAFKACLIENT_INIT_TIMEOUT_DOC = "The timeout for initialization of the Kafka store, including creation of the Kafka topic that stores schema data.";
    protected static final String KAFKACLIENT_TIMEOUT_DOC = "The timeout for an operation on the Kafka store";
    protected static final String ZOOKEEPER_SET_ACL_DOC = "Whether or not to set an ACL in ZooKeeper when znodes are created and ZooKeeper SASL authentication is configured. IMPORTANT: if set to `true`, the SASL principal must be the same as the Kafka brokers.";
    protected static final String KAFKACLIENT_SECURITY_PROTOCOL_DOC = "The security protocol to use when connecting with Kafka, the underlying persistent storage. Values can be `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, or `SASL_SSL`.";
    protected static final String KAFKACLIENT_SSL_TRUSTSTORE_LOCATION_DOC = "The location of the SSL trust store file.";
    protected static final String KAFKACLIENT_SSL_TRUSTSTORE_PASSWORD_DOC = "The password to access the trust store.";
    protected static final String KAFAKSTORE_SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store.";
    protected static final String KAFKACLIENT_SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by the trust manager factory for SSL connections.";
    protected static final String KAFKACLIENT_SSL_KEYSTORE_LOCATION_DOC = "The location of the SSL keystore file.";
    protected static final String KAFKACLIENT_SSL_KEYSTORE_PASSWORD_DOC = "The password to access the keystore.";
    protected static final String KAFAKSTORE_SSL_KEYSTORE_TYPE_DOC = "The file format of the keystore.";
    protected static final String KAFKACLIENT_SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections.";
    protected static final String KAFKACLIENT_SSL_KEY_PASSWORD_DOC = "The password of the key contained in the keystore.";
    protected static final String KAFAKSTORE_SSL_ENABLED_PROTOCOLS_DOC = "Protocols enabled for SSL connections.";
    protected static final String KAFAKSTORE_SSL_PROTOCOL_DOC = "The SSL protocol used.";
    protected static final String KAFAKSTORE_SSL_PROVIDER_DOC = "The name of the security provider used for SSL.";
    protected static final String KAFKACLIENT_SSL_CIPHER_SUITES_DOC = "A list of cipher suites used for SSL.";
    protected static final String KAFKACLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate the server hostname using the server certificate.";
    public static final String KAFKACLIENT_SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that the Kafka client runs as. This can be defined either in the JAAS config file or here.";
    public static final String KAFKACLIENT_SASL_MECHANISM_DOC = "The SASL mechanism used for Kafka connections. GSSAPI is the default.";
    public static final String KAFKACLIENT_SASL_KERBEROS_KINIT_CMD_DOC = "The Kerberos kinit command path.";
    public static final String KAFKACLIENT_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "The login time between refresh attempts.";
    public static final String KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "The percentage of random jitter added to the renewal time.";
    public static final String KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.";
    public static final String KAFKA_REST_RESOURCE_EXTENSION_DOC = "Fully qualified class name of a  valid Implementation of the interface RestResourceExtensionThis can be used to inject user defined resources like filters. Typically used to add custom capability like logging, security, etc  ";
    private static final boolean ZOOKEEPER_SET_ACL_DEFAULT = false;
    public static final String STREAMS_DEFAULT_STREAM_CONFIG = "streams.default.stream";
    private static final String STREAMS_DEFAULT_STREAM_DOC = "The default stream the consumer should poll messages from andthe producer should send messages to, if the topic name does not specify the stream path and the property has a valid value, then this topic name is looked in the default stream.";
    private static final String STREAMS_DEFAULT_STREAM_DEFAULT = "";
    public static final String REST_PROXY_IMPERSONATION = "rest.proxy.enable.doAs";
    private static final String REST_PROXY_IMPERSONATION_DOC = "Set to true if you want impersonations for streams to be enabled, if false - all manipulation will be performed from admin of cluster user";
    public static final boolean REST_PROXY_IMPERSONATION_DEFAULT = true;
    protected static final String SSL_PROTOCOL_DEFAULT_OVERRIDE = "TLSv1.2";
    protected static final String SSL_ENABLED_PROTOCOLS_DEFAULT_OVERRIDE = "TLSv1.1,TLSv1.2";
    public static final String PRODUCERS_MAX_CACHES_NUM_CONFIG = "producers.max.caches.num";
    private static final String PRODUCERS_MAX_CACHES_NUM_DOC = "Maximum number user names for which producers are cached. If 0, then caching is disabled and producer will be created for each request.";
    public static final int PRODUCERS_MAX_CACHES_NUM_DEFAULT = 20;
    private static final ConfigDef config = KafkaRestConfig.baseKafkaRestConfigDef();
    private Time time;
    private boolean isStreams;
    private boolean defaultStreamSet;
    private boolean isImpersonationEnabled;
    private Properties originalProperties;

    protected static ConfigDef baseKafkaRestConfigDef() {
        return KafkaRestConfig.baseConfigDef().defineOverride("port", ConfigDef.Type.INT, (Object)8082, ConfigDef.Importance.LOW, "DEPRECATED: port to listen on for new HTTP connections. Use listeners instead.").defineOverride("listeners", ConfigDef.Type.LIST, (Object)"", ConfigDef.Importance.HIGH, "List of listeners. http and https are supported. Each listener must include the protocol, hostname, and port. For example: http://myhost:8080, https://0.0.0.0:8081").defineOverride("response.mediatype.preferred", ConfigDef.Type.LIST, Versions.PREFERRED_RESPONSE_TYPES, ConfigDef.Importance.LOW, "An ordered list of the server's preferred media types used for responses, from most preferred to least.").defineOverride("response.mediatype.default", ConfigDef.Type.STRING, (Object)"application/vnd.kafka.v1+json", ConfigDef.Importance.LOW, "The default response media type that should be used if no specify types are requested in an Accept header.").defineOverride("metrics.jmx.prefix", ConfigDef.Type.STRING, (Object)METRICS_JMX_PREFIX_DEFAULT_OVERRIDE, ConfigDef.Importance.LOW, "Prefix to apply to metric names for the default JMX reporter.").defineOverride("ssl.protocol", ConfigDef.Type.STRING, (Object)SSL_PROTOCOL_DEFAULT_OVERRIDE, ConfigDef.Importance.MEDIUM, "The SSL protocol used to generate the SslContextFactory.").defineOverride("ssl.enabled.protocols", ConfigDef.Type.LIST, (Object)SSL_ENABLED_PROTOCOLS_DEFAULT_OVERRIDE, ConfigDef.Importance.MEDIUM, "The list of protocols enabled for SSL connections. Comma-separated list. Leave blank to use Jetty's defaults.").defineOverride("authentication.method", ConfigDef.Type.STRING, (Object)"BASIC", (ConfigDef.Validator)AUTHENTICATION_METHOD_VALIDATOR, ConfigDef.Importance.LOW, "Method of authentication. Must be BASIC to enable authentication. You must supply a valid JAAS config file for the 'java.security.auth.login.config' system property for the appropriate authentication provider.").defineOverride("authentication.realm", ConfigDef.Type.STRING, (Object)"jpamLogin", ConfigDef.Importance.LOW, "Security realm to be used in authentication.").defineOverride("authentication.roles", ConfigDef.Type.LIST, (Object)AUTHENTICATION_ROLES_DEFAULT, ConfigDef.Importance.LOW, "Valid roles to authenticate against.").define(ID_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.HIGH, ID_CONFIG_DOC).define(HOST_NAME_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, HOST_NAME_DOC).define(ZOOKEEPER_CONNECT_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.HIGH, ZOOKEEPER_CONNECT_DOC).define(BOOTSTRAP_SERVERS_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.HIGH, BOOTSTRAP_SERVERS_DOC).define(SCHEMA_REGISTRY_URL_CONFIG, ConfigDef.Type.STRING, (Object)SCHEMA_REGISTRY_URL_DEFAULT, ConfigDef.Importance.HIGH, SCHEMA_REGISTRY_URL_DOC).define(SIMPLE_CONSUMER_MAX_POLL_TIME_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, SIMPLE_CONSUMER_MAX_POLL_TIME_DOC).define(SIMPLE_CONSUMER_MAX_CACHES_NUM_CONFIG, ConfigDef.Type.INT, (Object)0, ConfigDef.Importance.MEDIUM, SIMPLE_CONSUMER_MAX_CACHES_NUM_DOC).define(SIMPLE_CONSUMER_CACHE_MAX_RECORDS_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.MEDIUM, SIMPLE_CONSUMER_CACHE_MAX_RECORDS_DOC).define(PRODUCER_THREADS_CONFIG, ConfigDef.Type.INT, (Object)PRODUCER_THREADS_DEFAULT, ConfigDef.Importance.LOW, PRODUCER_THREADS_DOC).define(CONSUMER_ITERATOR_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)"1", ConfigDef.Importance.LOW, CONSUMER_ITERATOR_TIMEOUT_MS_DOC).define(CONSUMER_ITERATOR_BACKOFF_MS_CONFIG, ConfigDef.Type.INT, (Object)CONSUMER_ITERATOR_BACKOFF_MS_DEFAULT, ConfigDef.Importance.LOW, CONSUMER_ITERATOR_BACKOFF_MS_DOC).define(CONSUMER_REQUEST_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)"1", ConfigDef.Importance.MEDIUM, CONSUMER_REQUEST_TIMEOUT_MS_DOC).define(CONSUMER_REQUEST_MAX_BYTES_CONFIG, ConfigDef.Type.LONG, (Object)0x4000000L, ConfigDef.Importance.MEDIUM, CONSUMER_REQUEST_MAX_BYTES_DOC).define(CONSUMER_THREADS_CONFIG, ConfigDef.Type.INT, (Object)"1", ConfigDef.Importance.MEDIUM, CONSUMER_THREADS_DOC).define(CONSUMER_INSTANCE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)CONSUMER_INSTANCE_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.LOW, CONSUMER_INSTANCE_TIMEOUT_MS_DOC).define(SIMPLE_CONSUMER_MAX_POOL_SIZE_CONFIG, ConfigDef.Type.INT, (Object)SIMPLE_CONSUMER_MAX_POOL_SIZE_DEFAULT, ConfigDef.Importance.MEDIUM, SIMPLE_CONSUMER_MAX_POOL_SIZE_DOC).define(SIMPLE_CONSUMER_POOL_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DEFAULT, ConfigDef.Importance.LOW, SIMPLE_CONSUMER_POOL_TIMEOUT_MS_DOC).define(KAFKACLIENT_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)30000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, KAFKACLIENT_ZK_SESSION_TIMEOUT_MS_DOC).define(KAFKACLIENT_INIT_TIMEOUT_CONFIG, ConfigDef.Type.INT, (Object)60000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, KAFKACLIENT_INIT_TIMEOUT_DOC).define(KAFKACLIENT_TIMEOUT_CONFIG, ConfigDef.Type.INT, (Object)500, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, KAFKACLIENT_TIMEOUT_DOC).define(KAFKACLIENT_SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, (Object)SecurityProtocol.PLAINTEXT.toString(), ConfigDef.Importance.MEDIUM, KAFKACLIENT_SECURITY_PROTOCOL_DOC).define(KAFKACLIENT_SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.HIGH, KAFKACLIENT_SSL_TRUSTSTORE_LOCATION_DOC).define(KAFKACLIENT_SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, (Object)"", ConfigDef.Importance.HIGH, KAFKACLIENT_SSL_TRUSTSTORE_PASSWORD_DOC).define(KAFKACLIENT_SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)"JKS", ConfigDef.Importance.MEDIUM, KAFAKSTORE_SSL_TRUSTSTORE_TYPE_DOC).define(KAFKACLIENT_SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, (Object)"PKIX", ConfigDef.Importance.LOW, KAFKACLIENT_SSL_TRUSTMANAGER_ALGORITHM_DOC).define(KAFKACLIENT_SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.HIGH, KAFKACLIENT_SSL_KEYSTORE_LOCATION_DOC).define(KAFKACLIENT_SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, (Object)"", ConfigDef.Importance.HIGH, KAFKACLIENT_SSL_KEYSTORE_PASSWORD_DOC).define(KAFKACLIENT_SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)"JKS", ConfigDef.Importance.MEDIUM, KAFAKSTORE_SSL_KEYSTORE_TYPE_DOC).define(KAFKACLIENT_SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, (Object)"SunX509", ConfigDef.Importance.LOW, KAFKACLIENT_SSL_KEYMANAGER_ALGORITHM_DOC).define(KAFKACLIENT_SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, (Object)"", ConfigDef.Importance.HIGH, KAFKACLIENT_SSL_KEY_PASSWORD_DOC).define(KAFKACLIENT_SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.STRING, (Object)"TLSv1.2,TLSv1.1,TLSv1", ConfigDef.Importance.MEDIUM, KAFAKSTORE_SSL_ENABLED_PROTOCOLS_DOC).define(KAFKACLIENT_SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, (Object)"TLS", ConfigDef.Importance.MEDIUM, KAFAKSTORE_SSL_PROTOCOL_DOC).define(KAFKACLIENT_SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, KAFAKSTORE_SSL_PROVIDER_DOC).define(KAFKACLIENT_SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, KAFKACLIENT_SSL_CIPHER_SUITES_DOC).define(KAFKACLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, KAFKACLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC).define(KAFKACLIENT_SASL_KERBEROS_SERVICE_NAME_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, KAFKACLIENT_SASL_KERBEROS_SERVICE_NAME_DOC).define(KAFKACLIENT_SASL_MECHANISM_CONFIG, ConfigDef.Type.STRING, (Object)"GSSAPI", ConfigDef.Importance.MEDIUM, KAFKACLIENT_SASL_MECHANISM_DOC).define(KAFKACLIENT_SASL_KERBEROS_KINIT_CMD_CONFIG, ConfigDef.Type.STRING, (Object)"/usr/bin/kinit", ConfigDef.Importance.LOW, KAFKACLIENT_SASL_KERBEROS_KINIT_CMD_DOC).define(KAFKACLIENT_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_CONFIG, ConfigDef.Type.LONG, (Object)60000, ConfigDef.Importance.LOW, KAFKACLIENT_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC).define(KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_JITTER_CONFIG, ConfigDef.Type.DOUBLE, (Object)0.05, ConfigDef.Importance.LOW, KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_JITTER_DOC).define(KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_CONFIG, ConfigDef.Type.DOUBLE, (Object)0.8, ConfigDef.Importance.LOW, KAFKACLIENT_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC).define(KAFKA_REST_RESOURCE_EXTENSION_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, KAFKA_REST_RESOURCE_EXTENSION_DOC).define(STREAMS_DEFAULT_STREAM_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, STREAMS_DEFAULT_STREAM_DOC).define(STREAM_BUFFER_MAX_TIME_CONFIG, ConfigDef.Type.INT, (Object)"1", ConfigDef.Importance.MEDIUM, STREAM_BUFFER_MAX_TIME_DOC).define(REST_PROXY_IMPERSONATION, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.MEDIUM, REST_PROXY_IMPERSONATION_DOC).define(PRODUCERS_MAX_CACHES_NUM_CONFIG, ConfigDef.Type.INT, (Object)20, ConfigDef.Importance.MEDIUM, PRODUCERS_MAX_CACHES_NUM_DOC);
    }

    public KafkaRestConfig() throws RestConfigException {
        this(new Properties());
    }

    public KafkaRestConfig(String propsFile) throws RestConfigException {
        this(KafkaRestConfig.getPropsFromFile((String)propsFile));
    }

    public KafkaRestConfig(Properties props) throws RestConfigException {
        this(props, new SystemTime());
    }

    public KafkaRestConfig(Properties props, Time time) throws RestConfigException {
        this(config, props, time);
    }

    public KafkaRestConfig(ConfigDef configDef, Properties props, Time time) throws RestConfigException {
        super(configDef, (Map)props);
        this.originalProperties = props;
        this.time = time;
        this.isStreams = true;
        this.defaultStreamSet = !"".equals(this.getString(STREAMS_DEFAULT_STREAM_CONFIG));
        this.isImpersonationEnabled = this.getBoolean(REST_PROXY_IMPERSONATION);
    }

    public Time getTime() {
        return this.time;
    }

    public boolean isStreams() {
        return this.isStreams;
    }

    public boolean isImpersonationEnabled() {
        return this.isImpersonationEnabled;
    }

    public boolean isDefaultStreamSet() {
        return this.defaultStreamSet;
    }

    public Properties getOriginalProperties() {
        return this.originalProperties;
    }

    private Properties addExistingV1Properties(Properties props) {
        for (Map.Entry<Object, Object> e : this.originalProperties.entrySet()) {
            String name = (String)e.getKey();
            if (name.startsWith("ssl.") || name.startsWith("sasl.") || name.startsWith("client.") || name.startsWith("producer.") || name.startsWith("consumer.")) continue;
            props.setProperty(name, this.originalProperties.getProperty(name));
        }
        return props;
    }

    private Properties addPropertiesWithPrefix(String prefix, Properties props) {
        int prefixLen = prefix.length();
        for (Map.Entry<Object, Object> e : this.originalProperties.entrySet()) {
            String name = (String)e.getKey();
            if (!name.startsWith(prefix)) continue;
            String newName = name.substring(prefixLen);
            props.setProperty(newName, this.originalProperties.getProperty(name));
        }
        return props;
    }

    public Properties getProducerProperties() {
        Properties producerProps = new Properties();
        this.addExistingV1Properties(producerProps);
        this.addPropertiesWithPrefix("client.", producerProps);
        this.addPropertiesWithPrefix("producer.", producerProps);
        return producerProps;
    }

    public Properties getConsumerProperties() {
        Properties consumerProps = new Properties();
        this.addPropertiesWithPrefix("client.", consumerProps);
        this.addPropertiesWithPrefix("consumer.", consumerProps);
        return consumerProps;
    }

    public Properties getAdminProperties() {
        Properties consumerProps = new Properties();
        this.addPropertiesWithPrefix("client.", consumerProps);
        this.addPropertiesWithPrefix("admin.", consumerProps);
        return consumerProps;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String bootstrapBrokers() {
        int zkSessionTimeoutMs = this.getInt(KAFKACLIENT_ZK_SESSION_TIMEOUT_MS_CONFIG);
        if (this.isStreams) {
            return "";
        }
        String bootstrapServersConfig = this.getString(BOOTSTRAP_SERVERS_CONFIG);
        if (StringUtil.isNotBlank((String)bootstrapServersConfig)) {
            return bootstrapServersConfig;
        }
        try (ZkUtils zkUtils = null;){
            zkUtils = ZkUtils.apply((String)this.getString(ZOOKEEPER_CONNECT_CONFIG), (int)zkSessionTimeoutMs, (int)zkSessionTimeoutMs, (boolean)JaasUtils.isZkSecurityEnabled());
            String string = this.getBootstrapBrokers(zkUtils);
            return string;
        }
    }

    private String getBootstrapBrokers(ZkUtils zkUtils) {
        Seq brokerSeq = zkUtils.getAllBrokersInCluster();
        List brokers = JavaConversions.seqAsJavaList((Seq)brokerSeq);
        String bootstrapBrokers = "";
        for (int i = 0; i < brokers.size(); ++i) {
            for (EndPoint ep : JavaConversions.asJavaCollection((Iterable)((Broker)brokers.get(i)).endPoints())) {
                if (bootstrapBrokers.length() > 0) {
                    bootstrapBrokers = bootstrapBrokers + ",";
                }
                String hostport = ep.host() == null ? ":" + ep.port() : Utils.formatAddress((String)ep.host(), (Integer)ep.port());
                bootstrapBrokers = bootstrapBrokers + ep.securityProtocol() + "://" + hostport;
            }
        }
        return bootstrapBrokers;
    }

    public static void main(String[] args) {
        System.out.print(config.toRst());
    }
}

