/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.GenericWorkerCoordinator;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class WorkerGroupMember {
    private static final String JMX_PREFIX = "kafka.connect";
    private final Logger log;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final GenericWorkerCoordinator coordinator;
    private boolean stopped = false;

    public WorkerGroupMember(DistributedConfig config, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener, Time time, String clientId, LogContext logContext) {
        try {
            this.clientId = clientId;
            this.log = logContext.logger(WorkerGroupMember.class);
            String configTopic = (String)config.originals().get("config.storage.topic");
            if (configTopic != null && (configTopic.startsWith("/") || configTopic.contains(":"))) {
                this.coordinator = (GenericWorkerCoordinator)GenericHFactory.getImplementorInstance((String)"com.mapr.kafka.eventstreams.impl.MarlinWorkerCoordinatorV10", (Object[])new Object[]{config, config.getString("group.id"), restUrl, configStorage, listener}, (Class[])new Class[]{DistributedConfig.class, String.class, String.class, ConfigBackingStore.class, WorkerRebalanceListener.class});
                this.client = null;
                this.metrics = null;
                return;
            }
            LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
            metricsTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(metricsTags);
            List reporters = CommonClientConfigs.metricsReporters((String)clientId, (AbstractConfig)config);
            HashMap<String, String> contextLabels = new HashMap<String, String>();
            contextLabels.putAll(config.originalsWithPrefix("metrics.context."));
            contextLabels.put("connect.kafka.cluster.id", config.kafkaClusterId());
            contextLabels.put("connect.group.id", config.getString("group.id"));
            KafkaMetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
            this.metrics = new Metrics(metricConfig, reporters, time, (MetricsContext)metricsContext);
            long retryBackoffMs = config.getLong("retry.backoff.ms");
            Metadata metadata = new Metadata(retryBackoffMs, config.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
            List addresses = ClientUtils.parseAndValidateAddresses((List)config.getList("bootstrap.servers"), (String)config.getString("client.dns.lookup"));
            metadata.bootstrap(addresses);
            String metricGrpPrefix = "connect";
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)config, (Time)time, (LogContext)logContext);
            NetworkClient netClient = new NetworkClient((Selectable)new Selector(config.getLong("connections.max.idle.ms").longValue(), this.metrics, time, metricGrpPrefix, channelBuilder, logContext), metadata, clientId, 100, config.getLong("reconnect.backoff.ms").longValue(), config.getLong("reconnect.backoff.max.ms").longValue(), config.getInt("send.buffer.bytes").intValue(), config.getInt("receive.buffer.bytes").intValue(), config.getInt("request.timeout.ms").intValue(), config.getLong("socket.connection.setup.timeout.ms").longValue(), config.getLong("socket.connection.setup.timeout.max.ms").longValue(), time, true, new ApiVersions(), logContext);
            this.client = new ConsumerNetworkClient(logContext, (KafkaClient)netClient, metadata, time, retryBackoffMs, config.getInt("request.timeout.ms").intValue(), Integer.MAX_VALUE);
            this.coordinator = new WorkerCoordinator(new GroupRebalanceConfig((AbstractConfig)config, GroupRebalanceConfig.ProtocolType.CONNECT), logContext, this.client, this.metrics, metricGrpPrefix, time, restUrl, configStorage, listener, ConnectProtocolCompatibility.compatibility(config.getString("connect.protocol")), config.getInt("scheduled.rebalance.max.delay.ms"));
            AppInfoParser.registerAppInfo((String)JMX_PREFIX, (String)clientId, (Metrics)this.metrics, (long)time.milliseconds());
            this.log.debug("Connect group member created");
        }
        catch (Throwable t) {
            this.stop(true);
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        this.stop(false);
    }

    public void ensureActive() {
        this.coordinator.poll(0L);
    }

    public void poll(long timeout) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("Timeout must not be negative");
        }
        this.coordinator.poll(timeout);
    }

    public void wakeup() {
        this.coordinator.wakeup();
    }

    public String memberId() {
        return this.coordinator.memberId();
    }

    public void requestRejoin() {
        this.coordinator.requestRejoin("connect worker requested rejoin");
    }

    public void maybeLeaveGroup(String leaveReason) {
        this.coordinator.maybeLeaveGroup(leaveReason);
    }

    public String ownerUrl(String connector) {
        return this.coordinator.ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        return this.coordinator.ownerUrl(task);
    }

    public short currentProtocolVersion() {
        return this.coordinator.currentProtocolVersion();
    }

    public void revokeAssignment(ExtendedAssignment assignment) {
        this.coordinator.revokeAssignment(assignment);
    }

    private void stop(boolean swallowException) {
        this.log.trace("Stopping the Connect group member.");
        AtomicReference firstException = new AtomicReference();
        this.stopped = true;
        Utils.closeQuietly((AutoCloseable)this.coordinator, (String)"coordinator", firstException);
        Utils.closeQuietly((AutoCloseable)this.metrics, (String)"consumer metrics", firstException);
        Utils.closeQuietly((AutoCloseable)this.client, (String)"consumer network client", firstException);
        AppInfoParser.unregisterAppInfo((String)JMX_PREFIX, (String)this.clientId, (Metrics)this.metrics);
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to stop the Connect group member", (Throwable)firstException.get());
        }
        this.log.debug("The Connect group member has stopped.");
    }

    Metrics metrics() {
        return this.metrics;
    }
}

