/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaStreamsMetadataObserver;
import io.confluent.kafkarest.SimpleConsumerFactory;
import io.confluent.kafkarest.TPConsumerState;
import io.confluent.kafkarest.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerPool {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerPool.class);
    private final int maxPoolSize;
    private final int poolInstanceAvailabilityTimeoutMs;
    private final Time time;
    private final SimpleConsumerFactory simpleConsumerFactory;
    private final Map<String, TPConsumerState> simpleConsumers;
    private final Queue<String> availableKafkaConsumers;
    private final Queue<String> availableStreamsConsumers;
    private final KafkaStreamsMetadataObserver metadataObserver;
    private final Lock poolLock;

    public SimpleConsumerPool(int maxPoolSize, int poolInstanceAvailabilityTimeoutMs, Time time, SimpleConsumerFactory simpleConsumerFactory, KafkaStreamsMetadataObserver metadataObserver) {
        this.maxPoolSize = maxPoolSize;
        this.poolInstanceAvailabilityTimeoutMs = poolInstanceAvailabilityTimeoutMs;
        this.time = time;
        this.simpleConsumerFactory = simpleConsumerFactory;
        this.metadataObserver = metadataObserver;
        this.poolLock = new ReentrantLock(true);
        this.simpleConsumers = new HashMap<String, TPConsumerState>();
        this.availableKafkaConsumers = new LinkedList<String>();
        this.availableStreamsConsumers = new LinkedList<String>();
    }

    public synchronized TPConsumerState get(String topic, int partition) {
        boolean requestToStreams;
        long expiration = this.time.milliseconds() + (long)this.poolInstanceAvailabilityTimeoutMs;
        do {
            requestToStreams = this.metadataObserver.requestToStreams(topic);
            String consumerId = null;
            if (requestToStreams) {
                if (this.availableStreamsConsumers.size() > 0) {
                    consumerId = this.availableStreamsConsumers.remove();
                }
            } else if (this.availableKafkaConsumers.size() > 0) {
                consumerId = this.availableKafkaConsumers.remove();
            }
            if (consumerId != null) {
                TPConsumerState consumerState = this.simpleConsumers.get(consumerId);
                consumerState.consumer().assign(Collections.singletonList(new TopicPartition(topic, partition)));
                return consumerState;
            }
            if (this.simpleConsumers.size() < this.maxPoolSize || this.maxPoolSize == 0) {
                return this.createAndAssign(topic, partition, requestToStreams);
            }
            try {
                this.wait(this.poolInstanceAvailabilityTimeoutMs);
            }
            catch (InterruptedException e) {
                log.warn("A thread requesting a SimpleConsumer has been interrupted while waiting", (Throwable)e);
            }
        } while (this.time.milliseconds() <= expiration || this.poolInstanceAvailabilityTimeoutMs == 0);
        boolean removed = false;
        try {
            if (requestToStreams) {
                if (this.availableKafkaConsumers.size() > 0) {
                    TPConsumerState removedConsumer = this.simpleConsumers.remove(this.availableKafkaConsumers.remove());
                    removedConsumer.consumer().close();
                    removed = true;
                }
            } else if (this.availableStreamsConsumers.size() > 0) {
                TPConsumerState removedConsumer = this.simpleConsumers.remove(this.availableStreamsConsumers.remove());
                removedConsumer.consumer().close();
                removed = true;
            }
        }
        catch (Exception e) {
            log.warn("Exception while closing consumer", (Throwable)e);
        }
        if (removed) {
            return this.createAndAssign(topic, partition, requestToStreams);
        }
        throw Errors.simpleConsumerPoolTimeoutException();
    }

    private synchronized TPConsumerState createAndAssign(String topic, int partition, boolean isStreamsConsumer) {
        SimpleConsumerFactory.ConsumerProvider simpleConsumer = this.simpleConsumerFactory.createConsumer();
        simpleConsumer.consumer().assign(Collections.singletonList(new TopicPartition(topic, partition)));
        TPConsumerState consumerState = new TPConsumerState(simpleConsumer.consumer(), isStreamsConsumer, this, simpleConsumer.clientId());
        this.simpleConsumers.put(simpleConsumer.clientId(), consumerState);
        return consumerState;
    }

    public synchronized void release(TPConsumerState tpConsumerState) {
        log.debug("Releasing into the pool SimpleConsumer with id " + tpConsumerState.clientId());
        if (tpConsumerState.isStreams()) {
            this.availableStreamsConsumers.add(tpConsumerState.clientId());
        } else {
            this.availableKafkaConsumers.add(tpConsumerState.clientId());
        }
        this.notify();
    }

    public void shutdown() {
        for (TPConsumerState consumer : this.simpleConsumers.values()) {
            consumer.consumer().wakeup();
            consumer.consumer().close();
        }
    }

    public int size() {
        return this.simpleConsumers.size();
    }
}

