/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.ConsumerWorker;
import io.confluent.kafkarest.ConsumerWorkerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaStreamsMetadataObserver;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.common.InvalidConfigException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String bootstrapServers;
    private final KafkaStreamsMetadataObserver mdObserver;
    private final Map<ConsumerInstanceId, ConsumerState> consumers = new HashMap<ConsumerInstanceId, ConsumerState>();
    private final List<ConsumerWorker> workers;
    private final AtomicInteger nextWorker;
    private final ExecutorService executor;
    private ConsumerFactory consumerFactory;
    private final PriorityQueue<ConsumerState> consumersByExpiration = new PriorityQueue();
    private final ExpirationThread expirationThread;

    public ConsumerManager(KafkaRestConfig config, KafkaStreamsMetadataObserver mdObserver, ConsumerFactory consumerFactory) {
        this.config = config;
        this.time = config.getTime();
        this.bootstrapServers = config.getString("bootstrap.servers");
        this.mdObserver = mdObserver;
        this.consumerFactory = consumerFactory != null ? consumerFactory : new ConsumerFactory(){

            public <K, V> KafkaConsumer<K, V> createConsumer(Properties props, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
                return new KafkaConsumer(props, keyDeserializer, valueDeserializer);
            }
        };
        this.workers = new Vector<ConsumerWorker>();
        for (int i = 0; i < config.getInt("consumer.threads"); ++i) {
            ConsumerWorker worker = new ConsumerWorker(config);
            this.workers.add(worker);
            worker.start();
        }
        this.nextWorker = new AtomicInteger(0);
        this.executor = Executors.newFixedThreadPool(1);
        this.expirationThread = new ExpirationThread();
        this.expirationThread.start();
    }

    public ConsumerManager(KafkaRestConfig config, KafkaStreamsMetadataObserver mdObserver) {
        this(config, mdObserver, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String createConsumer(String group, ConsumerInstanceConfig instanceConfig) {
        String name = instanceConfig.getName();
        if (instanceConfig.getId() != null) {
            name = instanceConfig.getId();
        }
        if (name == null) {
            name = "rest-consumer-";
            String serverId = this.config.getString("id");
            if (!serverId.isEmpty()) {
                name = name + serverId + "-";
            }
            name = name + UUID.randomUUID().toString();
        }
        ConsumerInstanceId cid = new ConsumerInstanceId(group, name);
        ConsumerManager consumerManager = this;
        synchronized (consumerManager) {
            if (this.consumers.containsKey(cid)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(cid, null);
        }
        boolean succeeded = false;
        try {
            String defaultStream;
            log.debug("Creating consumer " + name + " in group " + group);
            Properties props = (Properties)this.config.getOriginalProperties().clone();
            props.setProperty("bootstrap.servers", this.bootstrapServers);
            props.setProperty("group.id", group);
            if (instanceConfig.getId() != null) {
                props.setProperty("consumer.id", instanceConfig.getId());
            }
            if (instanceConfig.getAutoCommitEnable() != null) {
                props.setProperty("enable.auto.commit", instanceConfig.getAutoCommitEnable());
            } else {
                props.setProperty("enable.auto.commit", "false");
            }
            if (instanceConfig.getAutoOffsetReset() != null) {
                props.setProperty("auto.offset.reset", instanceConfig.getAutoOffsetReset());
            }
            if (!"".equals(defaultStream = this.config.getString("streams.default.stream"))) {
                props.put("streams.consumer.default.stream", defaultStream);
            }
            try {
                ConsumerState state = EmbeddedFormat.createConsumerState(instanceConfig.getFormat(), this.config, cid, props, this.consumerFactory);
                Object object = this;
                synchronized (object) {
                    this.consumers.put(cid, state);
                    this.consumersByExpiration.add(state);
                    this.notifyAll();
                }
                succeeded = true;
                object = name;
                return object;
            }
            catch (InvalidConfigException e) {
                throw Errors.invalidConsumerConfigException(e);
            }
        }
        finally {
            if (!succeeded) {
                ConsumerManager consumerManager2 = this;
                synchronized (consumerManager2) {
                    this.consumers.remove(cid);
                }
            }
        }
    }

    public <KafkaK, KafkaV, ClientK, ClientV> Future readTopic(String group, String instance, String topic, Class<? extends ConsumerState<KafkaK, KafkaV, ClientK, ClientV>> consumerStateType, long maxBytes, final ReadCallback callback) {
        ConsumerWorker worker;
        ConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        if (!consumerStateType.isInstance(state)) {
            callback.onCompletion(null, (Exception)Errors.consumerFormatMismatch());
            return null;
        }
        if (this.mdObserver.isImpersonationEnabled()) {
            KafkaStreamsMetadataObserver mdUserObserver = new KafkaStreamsMetadataObserver(this.config, this.mdObserver.getZkUtils(), this.mdObserver.isStreams(), true);
            if (!mdUserObserver.topicExists(topic)) {
                callback.onCompletion(null, (Exception)Errors.topicNotFoundException());
                return null;
            }
        } else if (!this.mdObserver.topicExists(topic)) {
            callback.onCompletion(null, (Exception)Errors.topicNotFoundException());
            return null;
        }
        if (this.mdObserver.isImpersonationEnabled()) {
            worker = new ConsumerWorker(this.config);
            worker.start();
        } else {
            int workerId = this.nextWorker.getAndIncrement() % this.workers.size();
            worker = this.workers.get(workerId);
        }
        return worker.readTopic(state, topic, maxBytes, new ConsumerWorkerReadCallback<ClientK, ClientV>(){

            @Override
            public void onCompletion(List<? extends AbstractConsumerRecord<ClientK, ClientV>> records, Exception e) {
                ConsumerManager.this.updateExpiration(state);
                if (e != null) {
                    Exception responseException = e;
                    if (!(e instanceof RestException)) {
                        responseException = Errors.kafkaErrorException(e);
                    }
                    callback.onCompletion(null, responseException);
                } else {
                    callback.onCompletion(records, null);
                }
            }
        });
    }

    public Future commitOffsets(String group, String instance, final CommitCallback callback) {
        ConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    List<TopicPartitionOffset> offsets = state.commitOffsets();
                    callback.onCompletion(offsets, null);
                }
                catch (Exception e) {
                    log.error("Failed to commit offsets for consumer " + state.getId().toString(), (Throwable)e);
                    Exception responseException = e;
                    if (!(e instanceof RestException)) {
                        responseException = Errors.kafkaErrorException(e);
                    }
                    callback.onCompletion(null, responseException);
                }
                finally {
                    ConsumerManager.this.updateExpiration(state);
                }
            }
        });
    }

    public void deleteConsumer(String group, String instance) {
        log.debug("Destroying consumer " + instance + " in group " + group);
        ConsumerState state = this.getConsumerInstance(group, instance, true);
        state.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        log.debug("Shutting down consumers");
        ConsumerManager consumerManager = this;
        synchronized (consumerManager) {
            for (ConsumerWorker consumerWorker : this.workers) {
                log.trace("Shutting down worker " + consumerWorker.toString());
                consumerWorker.shutdown();
            }
            this.workers.clear();
        }
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        consumerManager = this;
        synchronized (consumerManager) {
            for (Map.Entry entry : this.consumers.entrySet()) {
                ((ConsumerState)entry.getValue()).close();
            }
            this.consumers.clear();
            this.consumersByExpiration.clear();
            this.executor.shutdown();
        }
    }

    private synchronized ConsumerState getConsumerInstance(String group, String instance, boolean remove) {
        ConsumerState state;
        ConsumerInstanceId id = new ConsumerInstanceId(group, instance);
        ConsumerState consumerState = state = remove ? this.consumers.remove(id) : this.consumers.get(id);
        if (state == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        this.consumersByExpiration.remove(state);
        return state;
    }

    private ConsumerState getConsumerInstance(String group, String instance) {
        return this.getConsumerInstance(group, instance, false);
    }

    private synchronized void updateExpiration(ConsumerState state) {
        state.updateExpiration();
        this.consumersByExpiration.add(state);
        this.notifyAll();
    }

    private class ExpirationThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ConsumerManager consumerManager = ConsumerManager.this;
            synchronized (consumerManager) {
                try {
                    while (this.isRunning.get()) {
                        long now = ConsumerManager.this.time.milliseconds();
                        while (!ConsumerManager.this.consumersByExpiration.isEmpty() && ((ConsumerState)ConsumerManager.this.consumersByExpiration.peek()).expired(now)) {
                            final ConsumerState state = (ConsumerState)ConsumerManager.this.consumersByExpiration.remove();
                            ConsumerManager.this.consumers.remove(state.getId());
                            ConsumerManager.this.executor.submit(new Runnable(){

                                @Override
                                public void run() {
                                    state.close();
                                }
                            });
                        }
                        long timeout = ConsumerManager.this.consumersByExpiration.isEmpty() ? Long.MAX_VALUE : ((ConsumerState)ConsumerManager.this.consumersByExpiration.peek()).untilExpiration(now);
                        ConsumerManager.this.wait(timeout);
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer worker thread.");
            }
        }
    }

    public static interface ConsumerFactory {
        public <K, V> Consumer<K, V> createConsumer(Properties var1, Deserializer<K> var2, Deserializer<V> var3);
    }

    public static interface CommitCallback {
        public void onCompletion(List<TopicPartitionOffset> var1, Exception var2);
    }

    public static interface ReadCallback<K, V> {
        public void onCompletion(List<? extends AbstractConsumerRecord<K, V>> var1, Exception var2);
    }
}

