/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.ConsumerWorkerReadCallback;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>
implements Future<List<AbstractConsumerRecord<ClientKeyT, ClientValueT>>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
    private KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
    private final long requestTimeoutMs;
    private final long maxResponseBytes;
    private final ConsumerWorkerReadCallback<ClientKeyT, ClientValueT> callback;
    private CountDownLatch finished;
    private Iterator<ConsumerRecord<ClientKeyT, ClientValueT>> iter;
    private List<AbstractConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed = 0L;
    private final long started;
    long waitExpiration;

    public KafkaConsumerReadTask(KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent, long timeout, long maxBytes, ConsumerWorkerReadCallback<ClientKeyT, ClientValueT> callback) {
        this.parent = parent;
        this.maxResponseBytes = Math.min(maxBytes, parent.getConfig().getLong("consumer.request.max.bytes"));
        long defaultRequestTimeout = parent.getConfig().getInt("consumer.request.timeout.ms");
        this.requestTimeoutMs = timeout <= 0L ? defaultRequestTimeout : Math.min(timeout, defaultRequestTimeout);
        this.callback = callback;
        this.finished = new CountDownLatch(1);
        this.started = parent.getConfig().getTime().milliseconds();
    }

    public boolean doPartialRead() {
        try {
            boolean exceededMaxResponseBytes;
            ConsumerRecordAndSize<ClientKeyT, ClientValueT> recordAndSize;
            if (this.messages == null) {
                this.parent.startRead();
                this.messages = new Vector<AbstractConsumerRecord<ClientKeyT, ClientValueT>>();
            }
            long roughMsgSize = 0L;
            long startedIteration = this.parent.getConfig().getTime().milliseconds();
            while (this.parent.hasNext() && this.bytesConsumed + (roughMsgSize = (recordAndSize = this.parent.createConsumerRecord(this.parent.peek())).getSize()) < this.maxResponseBytes) {
                this.messages.add(recordAndSize.getRecord());
                this.parent.next();
                this.bytesConsumed += roughMsgSize;
            }
            log.trace("KafkaConsumerReadTask exiting read with id={} messages={} bytes={}, backing off if not complete", new Object[]{this, this.messages.size(), this.bytesConsumed});
            long now = this.parent.getConfig().getTime().milliseconds();
            long elapsed = now - this.started;
            int itbackoff = this.parent.getConfig().getInt("consumer.iterator.backoff.ms");
            long backoffExpiration = startedIteration + (long)itbackoff;
            long requestExpiration = this.started + this.requestTimeoutMs;
            this.waitExpiration = Math.min(backoffExpiration, requestExpiration);
            boolean requestTimedOut = elapsed >= this.requestTimeoutMs;
            boolean bl = exceededMaxResponseBytes = this.bytesConsumed + roughMsgSize >= this.maxResponseBytes;
            if (requestTimedOut || exceededMaxResponseBytes) {
                log.trace("Finishing KafkaConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={}", new Object[]{this, requestTimedOut, exceededMaxResponseBytes});
                this.finish();
            }
            return true;
        }
        catch (Exception e) {
            this.finish(e);
            log.error("Unexpected exception in consumer read task id={} ", (Object)this, (Object)e);
            return false;
        }
    }

    void finish() {
        this.finish(null);
    }

    void finish(Exception e) {
        log.trace("Finishing KafkaConsumerReadTask id={} exception={}", (Object)this, (Object)e);
        this.parent.finishRead();
        try {
            this.callback.onCompletion(e == null ? this.messages : null, e);
        }
        catch (Throwable t) {
            log.error("Consumer read callback threw an unhandled exception id={}", (Object)this, (Object)e);
        }
        this.finished.countDown();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return this.finished.getCount() == 0L;
    }

    @Override
    public List<AbstractConsumerRecord<ClientKeyT, ClientValueT>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override
    public List<AbstractConsumerRecord<ClientKeyT, ClientValueT>> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(timeout, unit);
        if (this.finished.getCount() > 0L) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}

