/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.ConsumerWorkerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.rest.exceptions.RestException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerReadTask<KafkaK, KafkaV, ClientK, ClientV>
implements Future<List<AbstractConsumerRecord<ClientK, ClientV>>> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerReadTask.class);
    private ConsumerState parent;
    private final long maxResponseBytes;
    private final ConsumerWorkerReadCallback<ClientK, ClientV> callback;
    private CountDownLatch finished;
    private Iterator<ConsumerRecord<KafkaK, KafkaV>> iter;
    private Consumer<KafkaK, KafkaV> consumer;
    private List<AbstractConsumerRecord<ClientK, ClientV>> messages;
    private List<AbstractConsumerRecord<ClientK, ClientV>> extraMessages;
    private long bytesConsumed = 0L;
    private final long started;
    private boolean readStarted = false;
    long waitExpiration;

    public ConsumerReadTask(ConsumerState parent, String topic, long maxBytes, ConsumerWorkerReadCallback<ClientK, ClientV> callback) {
        this.parent = parent;
        this.maxResponseBytes = Math.min(maxBytes, parent.getConfig().getLong("consumer.request.max.bytes"));
        this.callback = callback;
        this.finished = new CountDownLatch(1);
        this.started = parent.getConfig().getTime().milliseconds();
        try {
            if (!parent.isSubscribed()) {
                parent.tryToSubscribeByTopicList(Collections.singletonList(topic));
            } else {
                HashSet<String> actual = new HashSet<String>();
                actual.add(topic);
                if (!parent.getSubscribedTopics().equals(actual)) {
                    throw Errors.consumerAlreadySubscribedException();
                }
            }
            ConsumerReadTask previousTask = parent.clearFailedTask();
            if (previousTask != null) {
                this.messages = previousTask.messages;
                this.bytesConsumed = previousTask.bytesConsumed;
            }
        }
        catch (RestException e) {
            this.finish((Exception)((Object)e));
        }
    }

    private boolean processConsumerRecord(ConsumerRecord<KafkaK, KafkaV> record) {
        ConsumerRecordAndSize recordAndSize = this.parent.convertConsumerRecord(record);
        long roughMsgSize = recordAndSize.getSize();
        if (this.bytesConsumed + roughMsgSize > this.maxResponseBytes) {
            return false;
        }
        this.messages.add(recordAndSize.getRecord());
        this.bytesConsumed += roughMsgSize;
        return true;
    }

    public boolean doPartialRead() {
        try {
            boolean backoff = false;
            long startedIteration = this.parent.getConfig().getTime().milliseconds();
            int requestTimeoutMs = this.parent.getConfig().getInt("consumer.request.timeout.ms");
            long endTime = startedIteration + (long)requestTimeoutMs;
            int itBackoff = this.parent.getConfig().getInt("consumer.iterator.backoff.ms");
            if (this.consumer == null) {
                this.parent.startRead();
                this.readStarted = true;
                this.consumer = this.parent.getConsumer();
                this.messages = new ArrayList<AbstractConsumerRecord<ClientK, ClientV>>();
                Queue queuedRecords = this.parent.queue();
                Iterator it = queuedRecords.iterator();
                while (it.hasNext()) {
                    if (!this.processConsumerRecord((ConsumerRecord)it.next())) continue;
                    it.remove();
                }
                this.waitExpiration = 0L;
            }
            try {
                while (this.parent.getConfig().getTime().milliseconds() < endTime) {
                    if (this.iter == null || !this.iter.hasNext()) {
                        ConsumerRecords records = this.consumer.poll((long)itBackoff);
                        this.iter = records.iterator();
                        if (!this.iter.hasNext()) {
                            backoff = true;
                            break;
                        }
                    }
                    while (this.iter.hasNext()) {
                        ConsumerRecord<KafkaK, KafkaV> record = this.iter.next();
                        if (this.processConsumerRecord(record)) continue;
                        this.parent.queue().add(record);
                        while (this.iter.hasNext()) {
                            this.parent.queue().add(this.iter.next());
                        }
                        this.finish();
                        return false;
                    }
                }
            }
            catch (WakeupException cte) {
                backoff = true;
            }
            long now = this.parent.getConfig().getTime().milliseconds();
            long elapsed = now - this.started;
            long backoffExpiration = startedIteration + (long)itBackoff;
            long requestExpiration = this.started + (long)this.parent.getConfig().getInt("consumer.request.timeout.ms");
            this.waitExpiration = Math.min(backoffExpiration, requestExpiration);
            if (elapsed >= (long)requestTimeoutMs) {
                this.finish();
            }
            return backoff;
        }
        catch (Exception e) {
            this.finish(e);
            log.error("Unexpected exception in consumer read thread: ", (Throwable)e);
            return false;
        }
    }

    public void finish() {
        this.finish(null);
    }

    public void finish(Exception e) {
        if (e == null) {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = this.parent.getConsumedOffsets();
            for (AbstractConsumerRecord<ClientK, ClientV> msg : this.messages) {
                TopicPartition topicPartition = new TopicPartition(msg.getTopic(), msg.getPartition());
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(msg.getOffset(), "");
                consumedOffsets.put(topicPartition, offsetAndMetadata);
            }
        } else if (this.messages != null && this.messages.size() > 0) {
            this.parent.setFailedTask(this);
        }
        if (this.readStarted) {
            this.parent.finishRead();
            this.readStarted = false;
        }
        try {
            this.callback.onCompletion(e == null ? this.messages : null, e);
        }
        catch (Throwable t) {
            log.error("Consumer read callback threw an unhandled exception", (Throwable)e);
        }
        this.finished.countDown();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return this.finished.getCount() == 0L;
    }

    @Override
    public List<AbstractConsumerRecord<ClientK, ClientV>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override
    public List<AbstractConsumerRecord<ClientK, ClientV>> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(timeout, unit);
        if (this.finished.getCount() > 0L) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}

