package org.apache.drill.exec.work.batch;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/batch/BaseRawBatchBuffer.class */
public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
    private static final Logger logger;
    protected BufferQueue<T> bufferQueue;
    protected final int bufferSizePerSocket;
    private int streamCounter;
    private final int fragmentCount;
    protected final FragmentContext context;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile BufferState state = BufferState.INIT;
    protected final AtomicBoolean outOfMemory = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/drill/exec/work/batch/BaseRawBatchBuffer$BufferQueue.class */
    public interface BufferQueue<T> {
        void addOomBatch(RawFragmentBatch rawFragmentBatch);

        RawFragmentBatch poll() throws IOException, InterruptedException;

        RawFragmentBatch take() throws IOException, InterruptedException;

        RawFragmentBatch poll(long j, TimeUnit timeUnit) throws InterruptedException, IOException;

        boolean checkForOutOfMemory();

        int size();

        boolean isEmpty();

        void add(T t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/batch/BaseRawBatchBuffer$BufferState.class */
    public enum BufferState {
        INIT,
        STREAMS_FINISHED,
        KILLED
    }

    public BaseRawBatchBuffer(FragmentContext fragmentContext, int i) {
        this.bufferSizePerSocket = fragmentContext.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
        this.fragmentCount = i;
        this.streamCounter = i;
        this.context = fragmentContext;
    }

    protected int getFragmentCount() {
        return this.fragmentCount;
    }

    @Override // org.apache.drill.exec.work.batch.RawBatchBuffer
    public synchronized void enqueue(RawFragmentBatch rawFragmentBatch) throws IOException {
        if (this.context != null && !this.context.getExecutorState().shouldContinue()) {
            kill(this.context);
        }
        if (!isTerminated()) {
            enqueueInner(rawFragmentBatch);
        } else {
            if (this.state != BufferState.KILLED) {
                throw new IOException("Attempted to enqueue batch after finished");
            }
            rawFragmentBatch.release();
            rawFragmentBatch.sendOk();
        }
    }

    protected abstract void enqueueInner(RawFragmentBatch rawFragmentBatch) throws IOException;

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider, java.lang.AutoCloseable
    public void close() {
        if (!isTerminated() && this.context.getExecutorState().shouldContinue()) {
            throw new IllegalStateException(String.format("Cleanup before finished. %d out of %d streams have finished", Integer.valueOf(completedStreams()), Integer.valueOf(this.fragmentCount)));
        }
        if (this.bufferQueue.isEmpty()) {
            return;
        }
        if (this.context.getExecutorState().shouldContinue()) {
            this.context.getExecutorState().fail(new IllegalStateException("Batches still in queue during cleanup"));
            logger.error("{} Batches in queue.", Integer.valueOf(this.bufferQueue.size()));
        }
        clearBufferWithBody();
    }

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider
    public synchronized void kill(FragmentContext fragmentContext) {
        this.state = BufferState.KILLED;
        clearBufferWithBody();
    }

    private void clearBufferWithBody() {
        while (!this.bufferQueue.isEmpty()) {
            RawFragmentBatch rawFragmentBatch = null;
            try {
                try {
                    try {
                        rawFragmentBatch = this.bufferQueue.poll();
                        assertAckSent(rawFragmentBatch);
                        if (rawFragmentBatch != null && rawFragmentBatch.getBody() != null) {
                            rawFragmentBatch.getBody().release();
                        }
                    } catch (IOException e) {
                        this.context.getExecutorState().fail(e);
                        if (rawFragmentBatch != null && rawFragmentBatch.getBody() != null) {
                            rawFragmentBatch.getBody().release();
                        }
                    }
                } catch (InterruptedException e2) {
                    this.context.getExecutorState().fail(e2);
                    Thread.currentThread().interrupt();
                    if (rawFragmentBatch != null && rawFragmentBatch.getBody() != null) {
                        rawFragmentBatch.getBody().release();
                    }
                }
            } catch (Throwable th) {
                if (rawFragmentBatch != null && rawFragmentBatch.getBody() != null) {
                    rawFragmentBatch.getBody().release();
                }
                throw th;
            }
        }
    }

    private void allStreamsFinished() {
        if (this.state != BufferState.KILLED) {
            this.state = BufferState.STREAMS_FINISHED;
        }
        if (!this.bufferQueue.isEmpty()) {
            throw new IllegalStateException("buffer not empty when finished");
        }
    }

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider
    public RawFragmentBatch getNext() throws IOException {
        if (this.outOfMemory.get() && this.bufferQueue.size() < 10) {
            this.outOfMemory.set(false);
        }
        try {
            RawFragmentBatch poll = this.bufferQueue.poll();
            if (poll == null && (!isTerminated() || !this.bufferQueue.isEmpty())) {
                while (poll == null) {
                    poll = this.bufferQueue.poll(5L, TimeUnit.SECONDS);
                    if (!this.context.getExecutorState().shouldContinue()) {
                        kill(this.context);
                        if (poll != null) {
                            assertAckSent(poll);
                            if (poll.getBody() != null) {
                                poll.getBody().release();
                            }
                            poll = null;
                        }
                    }
                }
            }
            if (this.context.getAllocator().isOverLimit()) {
                this.outOfMemory.set(true);
            }
            if (poll != null) {
                upkeep(poll);
                if (poll.getHeader().getIsLastBatch()) {
                    logger.debug("Got last batch from {}:{}", Integer.valueOf(poll.getHeader().getSendingMajorFragmentId()), Integer.valueOf(poll.getHeader().getSendingMinorFragmentId()));
                    if (decrementStreamCounter() == 0) {
                        logger.debug("Streams finished");
                        allStreamsFinished();
                    }
                }
            } else {
                if (!this.bufferQueue.isEmpty()) {
                    throw new IllegalStateException("Returning null when there are batches left in queue");
                }
                if (!isTerminated()) {
                    throw new IllegalStateException("Returning null when not finished");
                }
            }
            assertAckSent(poll);
            return poll;
        } catch (InterruptedException e) {
            if (this.context.getExecutorState().shouldContinue()) {
                throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
            }
            kill(this.context);
            Thread.currentThread().interrupt();
            return null;
        }
    }

    private void assertAckSent(RawFragmentBatch rawFragmentBatch) {
        if (!$assertionsDisabled && rawFragmentBatch != null && !rawFragmentBatch.isAckSent()) {
            throw new AssertionError("Ack not sent for batch");
        }
    }

    private int decrementStreamCounter() {
        this.streamCounter--;
        return this.streamCounter;
    }

    private int completedStreams() {
        return this.fragmentCount - this.streamCounter;
    }

    protected abstract void upkeep(RawFragmentBatch rawFragmentBatch);

    protected boolean isTerminated() {
        return this.state == BufferState.KILLED || this.state == BufferState.STREAMS_FINISHED;
    }

    static {
        $assertionsDisabled = !BaseRawBatchBuffer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(BaseRawBatchBuffer.class);
    }
}
