package com.mapr.ojai.store.impl;

import com.mapr.db.impl.OjaiQueryProperties;
import com.mapr.ojai.store.impl.results.OjaiResultProcessor;
import com.mapr.ojai.store.impl.results.QueryResultProcessor;
import com.mapr.ojai.store.impl.results.SqlResultProcessor;
import com.mapr.utils.StackTrace;
import io.netty.buffer.DrillBuf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.rpc.user.clusterclient.DrillSession;
import org.ojai.Document;
import org.ojai.DocumentListener;
import org.ojai.exceptions.OjaiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream.class */
public class DrillDocumentStream extends QueryDocumentStream {
    private final SharedDrillSession sharedDrillSession;
    private final OjaiConnection ojaiConnection;
    private final QueryContext queryContext;
    private final QueryResultProcessor resultProcessor;
    private UserBitShared.QueryId queryId;
    private boolean ignoreData;
    private final String sql;
    private final int id;
    private static final Logger logger = LoggerFactory.getLogger(DrillDocumentStream.class);
    private static final AtomicInteger idGenerator = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$CancellationRequest.class */
    public static class CancellationRequest implements Runnable {
        private final SharedDrillSession sharedDrillSession;
        private final UserBitShared.QueryId queryId;

        public CancellationRequest(SharedDrillSession sharedDrillSession, UserBitShared.QueryId queryId) {
            this.sharedDrillSession = sharedDrillSession;
            sharedDrillSession.addRef();
            this.queryId = queryId;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.sharedDrillSession.getSession().cancelQuery(this.queryId);
            this.sharedDrillSession.close();
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/DrillDocumentStream$DocumentResultsListener.class */
    private class DocumentResultsListener implements UserResultsListener {
        private final DocumentListener documentListener;
        private final RecordBatchLoader loader;

        public DocumentResultsListener(BufferAllocator bufferAllocator, DocumentListener documentListener) {
            this.documentListener = documentListener;
            this.loader = new RecordBatchLoader(bufferAllocator);
        }

        public void queryIdArrived(UserBitShared.QueryId queryId) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].queryIdArrived(queryId = " + QueryIdHelper.getQueryId(queryId) + " , sql string = " + DrillDocumentStream.this.sql + ")");
            synchronized (DrillDocumentStream.this) {
                DrillDocumentStream.this.queryId = queryId;
                if (DrillDocumentStream.this.ignoreData) {
                    DrillDocumentStream.this.cancelQuery();
                }
            }
        }

        public void submissionFailed(UserException userException) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].submissionFailed(ex = " + userException + ")");
            synchronized (DrillDocumentStream.this) {
                DrillDocumentStream.this.ignoreData = true;
            }
            this.documentListener.failed(new OjaiException("Drill submissionFailed for \"" + DrillDocumentStream.this.queryContext.getSql() + '\"', userException));
        }

        private void cleanup(DrillBuf drillBuf) {
            this.loader.clear();
            drillBuf.release();
        }

        private void deliverDocument(Document document, DrillBuf drillBuf) {
            try {
                if (!this.documentListener.documentArrived(document)) {
                    DrillDocumentStream.this.cancelQuery();
                }
            } catch (Exception e) {
                DrillDocumentStream.this.cancelQuery();
                cleanup(drillBuf);
                DrillDocumentStream.logger.info("caught and ignored exception in user event listener " + e);
            }
        }

        public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...)");
            DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) synchronizing on " + DrillDocumentStream.this);
            synchronized (DrillDocumentStream.this) {
                if (DrillDocumentStream.this.ignoreData) {
                    queryDataBatch.release();
                    return;
                }
                UserBitShared.QueryData header = queryDataBatch.getHeader();
                int rowCount = header.getRowCount();
                DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) rowCount = " + rowCount);
                if (queryDataBatch.hasData()) {
                    DrillBuf data = queryDataBatch.getData();
                    try {
                        this.loader.load(header.getDef(), data);
                    } catch (SchemaChangeException e) {
                        DrillDocumentStream.this.cancelQuery();
                        this.documentListener.failed(e);
                    }
                    DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) extracting first");
                    deliverDocument(DrillDocumentStream.this.resultProcessor.extractFirst(this.loader), data);
                    int i = 1;
                    while (true) {
                        if (i >= rowCount) {
                            break;
                        }
                        synchronized (DrillDocumentStream.this) {
                            if (DrillDocumentStream.this.ignoreData) {
                                break;
                            }
                        }
                        break;
                        DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) extracting " + i);
                        deliverDocument(DrillDocumentStream.this.resultProcessor.extractNext(i), data);
                        i++;
                    }
                    cleanup(data);
                }
                DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) returning");
            }
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState queryState) {
            DrillDocumentStream.logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].queryCompleted(" + queryState + ")");
            this.documentListener.eos();
        }
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext, SharedDrillSession sharedDrillSession) {
        this.id = idGenerator.addAndGet(1);
        this.ojaiConnection = ojaiConnection;
        this.queryContext = queryContext;
        this.sql = queryContext.getSql();
        this.resultProcessor = queryContext.isOjaiQuery() ? new OjaiResultProcessor(queryContext) : new SqlResultProcessor(queryContext.excludeId());
        if (sharedDrillSession != null) {
            this.sharedDrillSession = sharedDrillSession;
        } else {
            this.sharedDrillSession = new SharedDrillSession(ojaiConnection, queryContext.getClusterName());
        }
        this.sharedDrillSession.addRef();
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext) {
        this(ojaiConnection, queryContext, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mapr.ojai.store.impl.AbstractDocumentStream
    public synchronized void cancelQuery() {
        if (isClosed() || isCanceled()) {
            return;
        }
        this.ignoreData = true;
        if (this.queryId != null) {
            logger.info("Cancelling query " + QueryIdHelper.getQueryId(this.queryId));
            logger.debug(new StackTrace().toString());
            this.ojaiConnection.getExecutorService().execute(new CancellationRequest(this.sharedDrillSession, this.queryId));
            super.cancelQuery();
        }
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    protected void submitQuery(DocumentListener documentListener) {
        DrillSession session = this.sharedDrillSession.getSession();
        session.executeStatement(this.queryContext.getSql(), new DelegatingResultsListener(this.ojaiConnection.getExecutorService(), this.id, new DocumentResultsListener(session.getAllocator(), documentListener)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.mapr.ojai.store.impl.QueryDocumentStream, com.mapr.ojai.store.impl.AbstractDocumentStream
    public void closeDerived() {
        this.sharedDrillSession.close();
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    public OjaiQueryProperties.QueryPath getQueryPath() {
        return OjaiQueryProperties.QueryPath.DRILL;
    }

    public UserBitShared.QueryId getQueryId() {
        return this.queryId;
    }

    public QueryContext getQueryContext() {
        return this.queryContext;
    }

    @Override // com.mapr.ojai.store.impl.QueryDocumentStream
    public void getQueryPlan(List<Map<String, Object>> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("streamName", getClass().getSimpleName());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("sql", this.sql);
        hashMap.put("parameters", hashMap2);
        list.add(hashMap);
    }
}
