/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.leaderelector.kafka;

import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.ClientConfig;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.GenericSRCoordinator;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.MarlinSRCoordinator;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryCoordinator;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryProtocol;
import io.confluent.kafka.schemaregistry.leaderelector.kafka.SchemaRegistryRebalanceListener;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LeaderElector;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.mapr.util.MaprKafkaUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaGroupLeaderElector
implements LeaderElector,
SchemaRegistryRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(KafkaGroupLeaderElector.class);
    private static final AtomicInteger SR_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.schema.registry";
    private final int initTimeout;
    private final String clientId;
    private ConsumerNetworkClient client;
    private final Metrics metrics;
    private Metadata metadata;
    private final long retryBackoffMs;
    private final boolean stickyLeaderElection;
    private final GenericSRCoordinator coordinator;
    private final KafkaSchemaRegistry schemaRegistry;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private ExecutorService executor;
    private CountDownLatch joinedLatch = new CountDownLatch(1);

    public KafkaGroupLeaderElector(SchemaRegistryConfig config, SchemaRegistryIdentity myIdentity, KafkaSchemaRegistry schemaRegistry) throws SchemaRegistryInitializationException {
        try {
            this.schemaRegistry = schemaRegistry;
            this.clientId = "sr-" + SR_CLIENT_ID_SEQUENCE.getAndIncrement();
            LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
            metricsTags.put("client-id", this.clientId);
            this.stickyLeaderElection = config.getBoolean("leader.election.sticky");
            long sampleWindowMs = config.getLong("metrics.sample.window.ms");
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).tags(metricsTags);
            List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            reporters.add(new JmxReporter());
            MetricsContext metricsContext = schemaRegistry.getMetricsContainer().getMetricsContext();
            Time time = Time.SYSTEM;
            ClientConfig clientConfig = new ClientConfig(config.originalsWithPrefix("kafkastore."), false);
            this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
            this.retryBackoffMs = clientConfig.getLong("retry.backoff.ms");
            String groupId = config.getString("schema.registry.group.id");
            boolean isMapr = Files.exists(Paths.get(MaprKafkaUtils.MAPR_CLUSTERS_FILE, new String[0]), new LinkOption[0]);
            if (isMapr) {
                this.coordinator = new MarlinSRCoordinator(groupId, config, myIdentity, this, schemaRegistry.getMetricsContainer().getNodeCountMetric(), this.stickyLeaderElection);
            } else {
                LogContext logContext = new LogContext("[Schema registry clientId=" + this.clientId + ", groupId=" + groupId + "] ");
                this.metadata = new Metadata(this.retryBackoffMs, clientConfig.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
                List bootstrapServers = config.getList("kafkastore.bootstrap.servers");
                List addresses = ClientUtils.parseAndValidateAddresses((List)bootstrapServers, (String)clientConfig.getString("client.dns.lookup"));
                this.metadata.bootstrap(addresses);
                String metricGrpPrefix = JMX_PREFIX;
                ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)clientConfig, (Time)time, (LogContext)logContext);
                long maxIdleMs = clientConfig.getLong("connections.max.idle.ms");
                NetworkClient netClient = new NetworkClient((Selectable)new Selector(maxIdleMs, this.metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, this.clientId, 100, clientConfig.getLong("reconnect.backoff.ms").longValue(), clientConfig.getLong("reconnect.backoff.max.ms").longValue(), clientConfig.getInt("send.buffer.bytes").intValue(), clientConfig.getInt("receive.buffer.bytes").intValue(), clientConfig.getInt("request.timeout.ms").intValue(), 10000L, 127000L, time, true, new ApiVersions(), logContext);
                this.client = new ConsumerNetworkClient(logContext, (KafkaClient)netClient, this.metadata, time, this.retryBackoffMs, clientConfig.getInt("request.timeout.ms").intValue(), Integer.MAX_VALUE);
                this.coordinator = new SchemaRegistryCoordinator(logContext, this.client, groupId, config.getInt("kafkagroup.rebalance.timeout.ms"), config.getInt("kafkagroup.session.timeout.ms"), config.getInt("kafkagroup.heartbeat.interval.ms"), this.metrics, metricGrpPrefix, time, this.retryBackoffMs, myIdentity, this, schemaRegistry.getMetricsContainer().getNodeCountMetric(), this.stickyLeaderElection);
            }
            AppInfoParser.registerAppInfo((String)JMX_PREFIX, (String)this.clientId, (Metrics)this.metrics, (long)time.milliseconds());
            this.initTimeout = config.getInt("kafkastore.init.timeout.ms");
            log.debug("Schema registry group member created");
        }
        catch (Throwable t) {
            this.stop(true);
            throw new SchemaRegistryInitializationException("Failed to construct kafka consumer", t);
        }
    }

    @Override
    public void init() throws SchemaRegistryTimeoutException, SchemaRegistryStoreException {
        log.debug("Initializing schema registry group member");
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!KafkaGroupLeaderElector.this.stopped.get()) {
                        KafkaGroupLeaderElector.this.coordinator.poll(Integer.MAX_VALUE);
                    }
                }
                catch (WakeupException wakeupException) {
                }
                catch (Throwable t) {
                    log.error("Unexpected exception in schema registry group processing thread", t);
                }
            }
        });
        try {
            if (!this.joinedLatch.await(this.initTimeout, TimeUnit.MILLISECONDS)) {
                throw new SchemaRegistryTimeoutException("Timed out waiting for join group to complete");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SchemaRegistryStoreException("Interrupted while waiting for join group to complete", e);
        }
        log.debug("Schema registry group member initialized and joined group");
    }

    @Override
    public void close() {
        if (this.stopped.get()) {
            return;
        }
        this.stop(false);
    }

    @Override
    public void onAssigned(SchemaRegistryProtocol.Assignment assignment, int generation) {
        log.info("Finished rebalance with leader election result: {}", (Object)assignment);
        try {
            switch (assignment.error()) {
                case 0: {
                    if (assignment.leaderIdentity() == null) {
                        log.error("No leader eligible schema registry instances joined the schema registry group. Rebalancing was successful and this instance can serve reads, but no writes can be processed.");
                    }
                    this.schemaRegistry.setLeader(assignment.leaderIdentity());
                    this.joinedLatch.countDown();
                    break;
                }
                case 1: {
                    throw new IllegalStateException("The schema registry group contained multiple members advertising the same URL. Verify that each instance has a unique, routable listener by setting the 'listeners' configuration. This error may happen if executing in containers where the default hostname is 'localhost'.");
                }
                default: {
                    throw new IllegalStateException("Unknown error returned from schema registry coordination protocol");
                }
            }
        }
        catch (SchemaRegistryException e) {
            log.error("Error when updating leader, we will not be able to forward requests to the leader", (Throwable)e);
        }
    }

    @Override
    public void onRevoked() {
        log.info("Rebalance started");
        if (!this.stickyLeaderElection) {
            try {
                this.schemaRegistry.setLeader(null);
            }
            catch (SchemaRegistryException e) {
                log.error("Error when updating leader, we will not be able to forward requests to the leader", (Throwable)e);
            }
        }
    }

    private void stop(boolean swallowException) {
        log.info("Stopping the schema registry group member.");
        if (this.coordinator != null) {
            this.coordinator.wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted waiting for schema registry group processing thread to exit", e);
            }
        }
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.stopped.set(true);
        KafkaGroupLeaderElector.closeQuietly(this.coordinator, "coordinator", firstException);
        KafkaGroupLeaderElector.closeQuietly((AutoCloseable)this.metrics, "consumer metrics", firstException);
        KafkaGroupLeaderElector.closeQuietly((AutoCloseable)this.client, "consumer network client", firstException);
        AppInfoParser.unregisterAppInfo((String)JMX_PREFIX, (String)this.clientId, (Metrics)this.metrics);
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to stop the schema registry group member", firstException.get());
        }
        log.info("The schema registry group member has stopped.");
    }

    private static void closeQuietly(AutoCloseable closeable, String name, AtomicReference<Throwable> firstException) {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (Throwable t) {
                firstException.compareAndSet(null, t);
                log.error("Failed to close {} with type {}", new Object[]{name, closeable.getClass().getName(), t});
            }
        }
    }
}

