/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerWorkerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerSeekToOffsetRequest;
import io.confluent.kafkarest.entities.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.TopicPartitionOffsetMetadata;
import io.confluent.kafkarest.v2.AvroKafkaConsumerState;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerWorker;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String bootstrapServers;
    private final Map<ConsumerInstanceId, KafkaConsumerState> consumers = new HashMap<ConsumerInstanceId, KafkaConsumerState>();
    private final List<KafkaConsumerWorker> workers;
    private final AtomicInteger nextWorker;
    private final ExecutorService executor;
    private KafkaConsumerFactory consumerFactory;
    private final PriorityQueue<KafkaConsumerState> consumersByExpiration = new PriorityQueue();
    private final ExpirationThread expirationThread;

    public KafkaConsumerManager(KafkaRestConfig config) {
        this.config = config;
        this.time = config.getTime();
        this.bootstrapServers = config.bootstrapBrokers();
        this.workers = new Vector<KafkaConsumerWorker>();
        for (int i = 0; i < config.getInt("consumer.threads"); ++i) {
            KafkaConsumerWorker worker = new KafkaConsumerWorker(config);
            this.workers.add(worker);
            worker.start();
        }
        this.nextWorker = new AtomicInteger(0);
        this.executor = Executors.newFixedThreadPool(1);
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.expirationThread.start();
    }

    KafkaConsumerManager(KafkaRestConfig config, KafkaConsumerFactory consumerFactory) {
        this(config);
        this.consumerFactory = consumerFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String createConsumer(String group, ConsumerInstanceConfig instanceConfig) {
        String name = instanceConfig.getName();
        if (instanceConfig.getId() != null) {
            name = instanceConfig.getId();
        }
        if (name == null) {
            name = "rest-consumer-";
            String serverId = this.config.getString("id");
            if (!serverId.isEmpty()) {
                name = name + serverId + "-";
            }
            name = name + UUID.randomUUID().toString();
        }
        ConsumerInstanceId cid = new ConsumerInstanceId(group, name);
        KafkaConsumerManager kafkaConsumerManager = this;
        synchronized (kafkaConsumerManager) {
            if (this.consumers.containsKey(cid)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(cid, null);
        }
        boolean succeeded = false;
        try {
            log.debug("Creating consumer " + name + " in group " + group);
            Properties props = this.config.getConsumerProperties();
            props.setProperty("bootstrap.servers", this.bootstrapServers);
            String defaultStream = this.config.getString("streams.default.stream");
            if (!"".equals(defaultStream)) {
                props.put("streams.consumer.default.stream", defaultStream);
            }
            props.setProperty("group.id", group);
            if (instanceConfig.getId() != null) {
                props.setProperty("consumer.id", instanceConfig.getId());
            }
            if (instanceConfig.getAutoCommitEnable() != null) {
                props.setProperty("enable.auto.commit", instanceConfig.getAutoCommitEnable());
            }
            if (instanceConfig.getAutoOffsetReset() != null) {
                props.setProperty("auto.offset.reset", instanceConfig.getAutoOffsetReset());
            }
            props.setProperty("schema.registry.url", this.config.getString("schema.registry.url"));
            switch (instanceConfig.getFormat()) {
                case AVRO: {
                    props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    break;
                }
                default: {
                    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                }
            }
            Object consumer = null;
            try {
                consumer = this.consumerFactory == null ? new KafkaConsumer(props) : this.consumerFactory.createConsumer(props);
            }
            catch (Exception e) {
                log.debug("ignoring this", (Throwable)e);
            }
            KafkaConsumerState state = null;
            switch (instanceConfig.getFormat()) {
                case BINARY: {
                    state = new BinaryKafkaConsumerState(this.config, cid, (Consumer)consumer);
                    break;
                }
                case AVRO: {
                    state = new AvroKafkaConsumerState(this.config, cid, (Consumer)consumer);
                    break;
                }
                case JSON: {
                    state = new JsonKafkaConsumerState(this.config, cid, (Consumer)consumer);
                    break;
                }
                default: {
                    throw new RestServerErrorException("Invalid embedded format for new consumer.", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
                }
            }
            Object object = this;
            synchronized (object) {
                this.consumers.put(cid, state);
                this.consumersByExpiration.add(state);
                this.notifyAll();
            }
            succeeded = true;
            object = name;
            return object;
        }
        finally {
            if (!succeeded) {
                KafkaConsumerManager kafkaConsumerManager2 = this;
                synchronized (kafkaConsumerManager2) {
                    this.consumers.remove(cid);
                }
            }
        }
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> Future readRecords(String group, String instance, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType, long timeout, long maxBytes, final ReadCallback callback) {
        KafkaConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        if (!consumerStateType.isInstance(state)) {
            callback.onCompletion(null, (Exception)Errors.consumerFormatMismatch());
            return null;
        }
        int workerId = this.nextWorker.getAndIncrement() % this.workers.size();
        KafkaConsumerWorker worker = this.workers.get(workerId);
        return worker.readRecords(state, timeout, maxBytes, new ConsumerWorkerReadCallback<ClientKeyT, ClientValueT>(){

            @Override
            public void onCompletion(List<? extends AbstractConsumerRecord<ClientKeyT, ClientValueT>> records, Exception e) {
                KafkaConsumerManager.this.updateExpiration(state);
                if (e != null) {
                    Exception responseException = e;
                    if (!(e instanceof RestException)) {
                        responseException = Errors.kafkaErrorException(e);
                    }
                    callback.onCompletion(null, responseException);
                } else {
                    callback.onCompletion(records, null);
                }
            }
        });
    }

    public Future commitOffsets(String group, String instance, final String async, final ConsumerOffsetCommitRequest offsetCommitRequest, final CommitCallback callback) {
        KafkaConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    List<TopicPartitionOffset> offsets = state.commitOffsets(async, offsetCommitRequest);
                    callback.onCompletion(offsets, null);
                }
                catch (Exception e) {
                    log.error("Failed to commit offsets for consumer " + state.getId().toString(), (Throwable)e);
                    Exception responseException = e;
                    if (!(e instanceof RestException)) {
                        responseException = Errors.kafkaErrorException(e);
                    }
                    callback.onCompletion(null, responseException);
                }
                finally {
                    KafkaConsumerManager.this.updateExpiration(state);
                }
            }
        });
    }

    public ConsumerCommittedResponse committed(String group, String instance, ConsumerCommittedRequest request) {
        log.debug("Committed offsets for consumer " + instance + " in group " + group);
        ConsumerCommittedResponse response = new ConsumerCommittedResponse();
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            response = state.committed(request);
        } else {
            response.offsets = new ArrayList<TopicPartitionOffsetMetadata>();
        }
        return response;
    }

    public void deleteConsumer(String group, String instance) {
        log.debug("Destroying consumer " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance, true);
        state.close();
    }

    public void subscribe(String group, String instance, ConsumerSubscriptionRecord subscription) {
        log.debug("Subscribing consumer " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.subscribe(subscription);
        }
    }

    public void unsubscribe(String group, String instance) {
        log.debug("Unsubcribing consumer " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.unsubscribe();
        }
    }

    public ConsumerSubscriptionResponse subscription(String group, String instance) {
        ConsumerSubscriptionResponse response = new ConsumerSubscriptionResponse();
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            Set<String> topics = state.subscription();
            response.topics = new ArrayList<String>(topics);
        } else {
            response.topics = new ArrayList<String>();
        }
        return response;
    }

    public void seekToBeginning(String group, String instance, ConsumerSeekToRequest seekToRequest) {
        log.debug("seeking to beginning " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToBeginning(seekToRequest);
        }
    }

    public void seekToEnd(String group, String instance, ConsumerSeekToRequest seekToRequest) {
        log.debug("seeking to end " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToEnd(seekToRequest);
        }
    }

    public void seekToOffset(String group, String instance, ConsumerSeekToOffsetRequest seekToOffsetRequest) {
        log.debug("seeking to offset " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToOffset(seekToOffsetRequest);
        }
    }

    public void assign(String group, String instance, ConsumerAssignmentRequest assignmentRequest) {
        log.debug("seeking to end " + instance + " in group " + group);
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.assign(assignmentRequest);
        }
    }

    public ConsumerAssignmentResponse assignment(String group, String instance) {
        log.debug("getting assignment for  " + instance + " in group " + group);
        ConsumerAssignmentResponse response = new ConsumerAssignmentResponse();
        response.partitions = new Vector<io.confluent.kafkarest.entities.TopicPartition>();
        KafkaConsumerState state = this.getConsumerInstance(group, instance);
        if (state != null) {
            Set<TopicPartition> topicPartitions = state.assignment();
            for (TopicPartition t : topicPartitions) {
                response.partitions.add(new io.confluent.kafkarest.entities.TopicPartition(t.topic(), t.partition()));
            }
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        log.debug("Shutting down consumers");
        KafkaConsumerManager kafkaConsumerManager = this;
        synchronized (kafkaConsumerManager) {
            for (KafkaConsumerWorker kafkaConsumerWorker : this.workers) {
                log.trace("Shutting down worker " + kafkaConsumerWorker.toString());
                kafkaConsumerWorker.shutdown();
            }
            this.workers.clear();
        }
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        kafkaConsumerManager = this;
        synchronized (kafkaConsumerManager) {
            for (Map.Entry entry : this.consumers.entrySet()) {
                ((KafkaConsumerState)entry.getValue()).close();
            }
            this.consumers.clear();
            this.consumersByExpiration.clear();
            this.executor.shutdown();
        }
    }

    private synchronized KafkaConsumerState getConsumerInstance(String group, String instance, boolean remove) {
        KafkaConsumerState state;
        ConsumerInstanceId id = new ConsumerInstanceId(group, instance);
        KafkaConsumerState kafkaConsumerState = state = remove ? this.consumers.remove(id) : this.consumers.get(id);
        if (state == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        this.consumersByExpiration.remove(state);
        return state;
    }

    private KafkaConsumerState getConsumerInstance(String group, String instance) {
        return this.getConsumerInstance(group, instance, false);
    }

    private synchronized void updateExpiration(KafkaConsumerState state) {
        state.updateExpiration();
        this.consumersByExpiration.add(state);
        this.notifyAll();
    }

    private class ExpirationThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            KafkaConsumerManager kafkaConsumerManager = KafkaConsumerManager.this;
            synchronized (kafkaConsumerManager) {
                try {
                    while (this.isRunning.get()) {
                        long now = KafkaConsumerManager.this.time.milliseconds();
                        while (!KafkaConsumerManager.this.consumersByExpiration.isEmpty() && ((KafkaConsumerState)KafkaConsumerManager.this.consumersByExpiration.peek()).expired(now)) {
                            final KafkaConsumerState state = (KafkaConsumerState)KafkaConsumerManager.this.consumersByExpiration.remove();
                            KafkaConsumerManager.this.consumers.remove(state.getId());
                            KafkaConsumerManager.this.executor.submit(new Runnable(){

                                @Override
                                public void run() {
                                    state.close();
                                }
                            });
                        }
                        long timeout = KafkaConsumerManager.this.consumersByExpiration.isEmpty() ? Long.MAX_VALUE : ((KafkaConsumerState)KafkaConsumerManager.this.consumersByExpiration.peek()).untilExpiration(now);
                        KafkaConsumerManager.this.wait(timeout);
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer worker thread.");
            }
        }
    }

    public static interface KafkaConsumerFactory {
        public Consumer createConsumer(Properties var1);
    }

    public static interface CommitCallback {
        public void onCompletion(List<TopicPartitionOffset> var1, Exception var2);
    }

    public static interface ReadCallback<K, V> {
        public void onCompletion(List<? extends ConsumerRecord<K, V>> var1, Exception var2);
    }
}

