package com.mapr.ojai.store.impl;

import com.google.common.base.Preconditions;
import com.mapr.db.impl.OjaiQueryProperties;
import com.mapr.ojai.store.impl.results.OjaiResultProcessor;
import com.mapr.ojai.store.impl.results.QueryResultProcessor;
import com.mapr.ojai.store.impl.results.SqlResultProcessor;
import com.mapr.utils.StackTrace;
import io.netty.buffer.DrillBuf;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.rpc.user.clusterclient.DrillSession;
import org.ojai.Document;
import org.ojai.DocumentListener;
import org.ojai.exceptions.OjaiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream.class */
public class DrillDocumentStream extends QueryDocumentStream {
    private final SharedDrillSession sharedDrillSession;
    private final OjaiConnection ojaiConnection;
    private final QueryContext queryContext;
    private final QueryResultProcessor resultProcessor;
    private UserBitShared.QueryId queryId;
    private volatile boolean ignoreData;
    private final String sql;
    private final long timeoutMs;
    private volatile boolean queryCompleted;
    private final int id;
    private DelegatingResultsListener decoupler;
    private DrillHandoff drillHandoff;
    private CountDownLatch streamToLatch;
    private static final Logger logger = LoggerFactory.getLogger(DrillDocumentStream.class);
    private static final AtomicInteger idGenerator = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DocumentBatch.class */
    public static class DocumentBatch {
        private final Iterator<Document> docIter;

        public DocumentBatch(List<Document> list) {
            this.docIter = list == null ? null : list.iterator();
        }

        public boolean hasNext() {
            if (this.docIter == null) {
                return false;
            }
            return this.docIter.hasNext();
        }

        public Document next() {
            if (hasNext()) {
                return this.docIter.next();
            }
            throw new NoSuchElementException();
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DocumentIterator.class */
    private class DocumentIterator implements Iterator<Document> {
        private final Iterator<DocumentBatch> msgIter;
        private DocumentBatch documentBatch;

        public DocumentIterator(DrillHandoff drillHandoff) {
            this.msgIter = drillHandoff.iterator();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (true) {
                if (this.documentBatch == null) {
                    if (!this.msgIter.hasNext()) {
                        return false;
                    }
                    this.documentBatch = this.msgIter.next();
                }
                if (this.documentBatch.hasNext()) {
                    return true;
                }
                this.documentBatch = null;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Document next() {
            if (hasNext()) {
                return this.documentBatch.next();
            }
            throw new NoSuchElementException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DocumentResultsListener.class */
    public class DocumentResultsListener implements UserResultsListener {
        private final RecordBatchLoader loader;
        private final DrillMessageListener drillMessageListener;

        public DocumentResultsListener(BufferAllocator bufferAllocator, DrillMessageListener drillMessageListener) {
            this.loader = new RecordBatchLoader(bufferAllocator);
            this.drillMessageListener = drillMessageListener;
        }

        public void queryIdArrived(UserBitShared.QueryId queryId) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].queryIdArrived(queryId = {} , sql string = {}", new Object[]{Integer.valueOf(DrillDocumentStream.this.id), QueryIdHelper.getQueryId(queryId), DrillDocumentStream.this.sql});
            synchronized (DrillDocumentStream.this) {
                DrillDocumentStream.this.queryId = queryId;
                if (DrillDocumentStream.this.ignoreData) {
                    DrillDocumentStream.this.cancelQuery();
                }
            }
        }

        public void submissionFailed(UserException userException) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].submissionFailed(ex = {})", Integer.valueOf(DrillDocumentStream.this.id), userException);
            synchronized (DrillDocumentStream.this) {
                DrillDocumentStream.this.ignoreData = true;
            }
            this.drillMessageListener.failed(new OjaiException("Drill submissionFailed for \"" + DrillDocumentStream.this.queryContext.getSql() + "\" please check storageplugin passed to queryservice.", userException));
        }

        private void cleanup(DrillBuf drillBuf) {
            this.loader.clear();
            drillBuf.release();
        }

        public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...)", Integer.valueOf(DrillDocumentStream.this.id));
            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...) synchronizing on ", Integer.valueOf(DrillDocumentStream.this.id), DrillDocumentStream.this);
            synchronized (DrillDocumentStream.this) {
                if (DrillDocumentStream.this.ignoreData) {
                    queryDataBatch.release();
                    return;
                }
                UserBitShared.QueryData header = queryDataBatch.getHeader();
                int rowCount = header.getRowCount();
                DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...) rowCount = {}", Integer.valueOf(DrillDocumentStream.this.id), Integer.valueOf(rowCount));
                if (queryDataBatch.hasData()) {
                    DrillBuf data = queryDataBatch.getData();
                    try {
                        this.loader.load(header.getDef(), data);
                        LinkedList linkedList = new LinkedList();
                        DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...) extracting first", Integer.valueOf(DrillDocumentStream.this.id));
                        linkedList.add(DrillDocumentStream.this.resultProcessor.extractFirst(this.loader));
                        for (int i = 1; i < rowCount; i++) {
                            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...) extracting {}", Integer.valueOf(DrillDocumentStream.this.id), Integer.valueOf(i));
                            linkedList.add(DrillDocumentStream.this.resultProcessor.extractNext(i));
                        }
                        cleanup(data);
                        this.drillMessageListener.documentsArrived(linkedList);
                    } catch (Exception e) {
                        DrillDocumentStream.this.cancelQuery();
                        try {
                            this.drillMessageListener.failed(e);
                            cleanup(data);
                            return;
                        } catch (Throwable th) {
                            cleanup(data);
                            return;
                        }
                    }
                }
                DrillDocumentStream.logger.debug("DocumentResultsListener[{}].dataArrived(...) returning", Integer.valueOf(DrillDocumentStream.this.id));
            }
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState queryState) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[{}].queryCompleted({})", Integer.valueOf(DrillDocumentStream.this.id), queryState);
            DrillDocumentStream.this.queryCompleted = true;
            this.drillMessageListener.eos();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DrillHandoff.class */
    public class DrillHandoff extends BlockingHandoff<DocumentBatch> {
        public DrillHandoff(long j) {
            super(j);
        }

        @Override // com.mapr.ojai.store.impl.BlockingHandoff
        protected void itemTimeout(long j) {
            throwQueryTimeout();
        }

        @Override // com.mapr.ojai.store.impl.BlockingHandoff
        protected void setup(CountDownLatch countDownLatch) {
            Preconditions.checkState(DrillDocumentStream.this.drillHandoff != null);
            DrillDocumentStream.this.asyncStreamTo(null);
            countDownLatch.countDown();
        }

        @Override // com.mapr.ojai.store.impl.BlockingHandoff
        protected void iteratorTimeout() {
            throwQueryTimeout();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DrillMessageHandoff.class */
    public static class DrillMessageHandoff implements DrillMessageListener {
        private final DrillHandoff drillHandoff;

        public DrillMessageHandoff(DrillHandoff drillHandoff) {
            this.drillHandoff = drillHandoff;
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void documentsArrived(List<Document> list) {
            this.drillHandoff.put((DrillHandoff) new DocumentBatch(list));
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void failed(Exception exc) {
            this.drillHandoff.putException(exc);
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void eos() {
            this.drillHandoff.putEos();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DrillMessageListener.class */
    public interface DrillMessageListener {
        void documentsArrived(List<Document> list);

        void failed(Exception exc);

        void eos();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DrillMessagePassthrough.class */
    public static class DrillMessagePassthrough implements DrillMessageListener {
        private final DocumentListener docListener;

        public DrillMessagePassthrough(DocumentListener documentListener) {
            this.docListener = documentListener;
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void documentsArrived(List<Document> list) {
            Iterator<Document> it = list.iterator();
            while (it.hasNext()) {
                this.docListener.documentArrived(it.next());
            }
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void failed(Exception exc) {
            this.docListener.failed(exc);
        }

        @Override // com.mapr.ojai.store.impl.DrillDocumentStream.DrillMessageListener
        public void eos() {
            this.docListener.eos();
        }
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext, SharedDrillSession sharedDrillSession) {
        this.id = idGenerator.addAndGet(1);
        this.ojaiConnection = ojaiConnection;
        this.queryContext = queryContext;
        this.sql = queryContext.getSql();
        OjaiQuery query = queryContext.getQuery();
        long timeout = query == null ? -1L : query.getTimeout();
        this.timeoutMs = timeout != -1 ? timeout : Long.MAX_VALUE;
        this.resultProcessor = queryContext.isOjaiQuery() ? new OjaiResultProcessor(queryContext) : new SqlResultProcessor(queryContext.excludeId());
        if (sharedDrillSession != null) {
            this.sharedDrillSession = sharedDrillSession;
        } else {
            this.sharedDrillSession = new SharedDrillSession(ojaiConnection, queryContext.getClusterName());
        }
        this.sharedDrillSession.addRef();
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext) {
        this(ojaiConnection, queryContext, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mapr.ojai.store.impl.AbstractDocumentStream
    public synchronized void cancelQuery() {
        this.ignoreData = true;
        if (isCanceled()) {
            return;
        }
        if (!this.queryCompleted) {
            logger.debug("cancelling query {}", this.queryId != null ? QueryIdHelper.getQueryId(this.queryId) : "[UNKNOWN]");
            if (this.queryId != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(new StackTrace().toString());
                }
                this.sharedDrillSession.getSession().cancelQuery(this.queryId);
            }
        }
        super.cancelQuery();
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    protected void submitQuery(DocumentListener documentListener) {
        DrillSession session = this.sharedDrillSession.getSession();
        this.decoupler = new DelegatingResultsListener(this.ojaiConnection.getExecutorService(), this.id, new DocumentResultsListener(session.getAllocator(), documentListener == null ? new DrillMessageHandoff(this.drillHandoff) : new DrillMessagePassthrough(documentListener)));
        logger.debug("DrillDocumentStream[{}] submitting \"{}\"", Integer.valueOf(this.id), this.queryContext.getSql());
        session.executeStatement(this.queryContext.getSql(), this.decoupler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mapr.ojai.store.impl.QueryDocumentStream, com.mapr.ojai.store.impl.AbstractDocumentStream
    public void closeDerived() {
        logger.debug("closeDerived()");
        if (this.decoupler != null) {
            this.decoupler.terminate();
        }
        this.sharedDrillSession.close();
        if (this.streamToLatch != null) {
            this.streamToLatch.countDown();
        }
    }

    public Iterator<Document> iterator() {
        checkState();
        this.drillHandoff = new DrillHandoff(this.timeoutMs);
        return new DocumentIterator(this.drillHandoff);
    }

    private void asyncStreamTo(DocumentListener documentListener) {
        checkState();
        this.isUsed = true;
        submitQuery(documentListener);
    }

    @Override // com.mapr.ojai.store.impl.AbstractDocumentStream
    public void streamTo(final DocumentListener documentListener) {
        checkState();
        this.streamToLatch = new CountDownLatch(1);
        asyncStreamTo(new DocumentListener() { // from class: com.mapr.ojai.store.impl.DrillDocumentStream.1
            public boolean documentArrived(Document document) {
                return documentListener.documentArrived(document);
            }

            public void eos() {
                try {
                    documentListener.eos();
                } finally {
                    DrillDocumentStream.this.streamToLatch.countDown();
                }
            }

            public void failed(Exception exc) {
                try {
                    documentListener.failed(exc);
                } finally {
                    DrillDocumentStream.this.streamToLatch.countDown();
                }
            }
        });
        try {
            this.streamToLatch.await();
        } catch (InterruptedException e) {
            throw new OjaiException(e);
        }
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    public OjaiQueryProperties.QueryPath getQueryPath() {
        return OjaiQueryProperties.QueryPath.DRILL;
    }

    public UserBitShared.QueryId getQueryId() {
        return this.queryId;
    }

    public QueryContext getQueryContext() {
        return this.queryContext;
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    public void getQueryPlan(List<Map<String, Object>> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("streamName", getClass().getSimpleName());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("sql", this.sql);
        hashMap.put("parameters", hashMap2);
        list.add(hashMap);
    }
}
