/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ApiVersions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Sensor;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.Time;

public class DefaultEventHandler
implements EventHandler {
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final DefaultBackgroundThread backgroundThread;

    public DefaultEventHandler(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, SubscriptionState subscriptionState, ApiVersions apiVersions, Metrics metrics, ClusterResourceListeners clusterResourceListeners, Sensor fetcherThrottleTimeSensor) {
        this(Time.SYSTEM, config, groupRebalanceConfig, logContext, new LinkedBlockingQueue<ApplicationEvent>(), new LinkedBlockingQueue<BackgroundEvent>(), subscriptionState, apiVersions, metrics, clusterResourceListeners, fetcherThrottleTimeSensor);
    }

    public DefaultEventHandler(Time time, ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, SubscriptionState subscriptionState, ApiVersions apiVersions, Metrics metrics, ClusterResourceListeners clusterResourceListeners, Sensor fetcherThrottleTimeSensor) {
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        ConsumerMetadata metadata = new ConsumerMetadata(config, subscriptionState, logContext, clusterResourceListeners);
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
        metadata.bootstrap(addresses);
        this.backgroundThread = new DefaultBackgroundThread(time, config, groupRebalanceConfig, logContext, this.applicationEventQueue, this.backgroundEventQueue, metadata, subscriptionState, apiVersions, metrics, fetcherThrottleTimeSensor);
        this.backgroundThread.start();
    }

    DefaultEventHandler(DefaultBackgroundThread backgroundThread, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue) {
        this.backgroundThread = backgroundThread;
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        backgroundThread.start();
    }

    @Override
    public Optional<BackgroundEvent> poll() {
        return Optional.ofNullable((BackgroundEvent)this.backgroundEventQueue.poll());
    }

    @Override
    public boolean isEmpty() {
        return this.backgroundEventQueue.isEmpty();
    }

    @Override
    public boolean add(ApplicationEvent event) {
        this.backgroundThread.wakeup();
        return this.applicationEventQueue.add(event);
    }

    @Override
    public void close() {
        try {
            this.backgroundThread.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

