/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.ojai.store.impl;

import com.mapr.db.impl.OjaiQueryProperties;
import com.mapr.ojai.store.impl.DelegatingResultsListener;
import com.mapr.ojai.store.impl.OjaiConnection;
import com.mapr.ojai.store.impl.QueryContext;
import com.mapr.ojai.store.impl.QueryDocumentStream;
import com.mapr.ojai.store.impl.SharedDrillSession;
import com.mapr.ojai.store.impl.results.OjaiResultProcessor;
import com.mapr.ojai.store.impl.results.QueryResultProcessor;
import com.mapr.ojai.store.impl.results.SqlResultProcessor;
import com.mapr.utils.StackTrace;
import io.netty.buffer.DrillBuf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.rpc.user.clusterclient.DrillSession;
import org.ojai.Document;
import org.ojai.DocumentListener;
import org.ojai.exceptions.OjaiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DrillDocumentStream
extends QueryDocumentStream {
    private static final Logger logger = LoggerFactory.getLogger(DrillDocumentStream.class);
    private final SharedDrillSession sharedDrillSession;
    private final OjaiConnection ojaiConnection;
    private final QueryContext queryContext;
    private final QueryResultProcessor resultProcessor;
    private UserBitShared.QueryId queryId;
    private boolean ignoreData;
    private final String sql;
    private static final AtomicInteger idGenerator = new AtomicInteger(0);
    private final int id = idGenerator.addAndGet(1);

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext, SharedDrillSession sharedDrillSession) {
        this.ojaiConnection = ojaiConnection;
        this.queryContext = queryContext;
        this.sql = queryContext.getSql();
        this.resultProcessor = queryContext.isOjaiQuery() ? new OjaiResultProcessor(queryContext) : new SqlResultProcessor(queryContext.excludeId());
        this.sharedDrillSession = sharedDrillSession != null ? sharedDrillSession : new SharedDrillSession(ojaiConnection, queryContext.getClusterName());
        this.sharedDrillSession.addRef();
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext) {
        this(ojaiConnection, queryContext, null);
    }

    @Override
    protected synchronized void cancelQuery() {
        if (!this.isClosed() && !this.isCanceled()) {
            this.ignoreData = true;
            if (this.queryId != null) {
                logger.info("Cancelling query " + QueryIdHelper.getQueryId((UserBitShared.QueryId)this.queryId));
                logger.debug(new StackTrace().toString());
                this.ojaiConnection.getExecutorService().execute(new CancellationRequest(this.sharedDrillSession, this.queryId));
                super.cancelQuery();
            }
        }
    }

    @Override
    protected void submitQuery(DocumentListener docListener) {
        DrillSession drillSession = this.sharedDrillSession.getSession();
        drillSession.executeStatement(this.queryContext.getSql(), (UserResultsListener)new DelegatingResultsListener(this.ojaiConnection.getExecutorService(), this.id, new DocumentResultsListener(drillSession.getAllocator(), docListener)));
    }

    @Override
    protected void closeDerived() {
        this.sharedDrillSession.close();
    }

    @Override
    public OjaiQueryProperties.QueryPath getQueryPath() {
        return OjaiQueryProperties.QueryPath.DRILL;
    }

    public UserBitShared.QueryId getQueryId() {
        return this.queryId;
    }

    public QueryContext getQueryContext() {
        return this.queryContext;
    }

    @Override
    public void getQueryPlan(List<Map<String, Object>> planList) {
        HashMap<String, Object> myMap = new HashMap<String, Object>();
        myMap.put("streamName", this.getClass().getSimpleName());
        HashMap<String, String> valueMap = new HashMap<String, String>();
        valueMap.put("sql", this.sql);
        myMap.put("parameters", valueMap);
        planList.add(myMap);
    }

    private class DocumentResultsListener
    implements UserResultsListener {
        private final DocumentListener documentListener;
        private final RecordBatchLoader loader;

        public DocumentResultsListener(BufferAllocator drillAllocator, DocumentListener documentListener) {
            this.documentListener = documentListener;
            this.loader = new RecordBatchLoader(drillAllocator);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void queryIdArrived(UserBitShared.QueryId queryId) {
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].queryIdArrived(queryId = " + QueryIdHelper.getQueryId((UserBitShared.QueryId)queryId) + " , sql string = " + DrillDocumentStream.this.sql + ")");
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                DrillDocumentStream.this.queryId = queryId;
                if (DrillDocumentStream.this.ignoreData) {
                    DrillDocumentStream.this.cancelQuery();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void submissionFailed(UserException ex) {
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].submissionFailed(ex = " + ex + ")");
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                DrillDocumentStream.this.ignoreData = true;
            }
            String exMsg = "Drill submissionFailed for \"" + DrillDocumentStream.this.queryContext.getSql() + '\"';
            this.documentListener.failed((Exception)new OjaiException(exMsg, (Throwable)ex));
        }

        private void cleanup(DrillBuf drillBuf) {
            this.loader.clear();
            drillBuf.release();
        }

        private void deliverDocument(Document document, DrillBuf drillBuf) {
            try {
                boolean stop;
                boolean bl = stop = !this.documentListener.documentArrived(document);
                if (stop) {
                    DrillDocumentStream.this.cancelQuery();
                }
            }
            catch (Exception ex) {
                DrillDocumentStream.this.cancelQuery();
                this.cleanup(drillBuf);
                logger.info("caught and ignored exception in user event listener " + ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...)");
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) synchronizing on " + DrillDocumentStream.this);
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                if (DrillDocumentStream.this.ignoreData) {
                    result.release();
                    return;
                }
            }
            UserBitShared.QueryData queryHeader = result.getHeader();
            int rowCount = queryHeader.getRowCount();
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) rowCount = " + rowCount);
            if (result.hasData()) {
                DrillBuf drillBuf = result.getData();
                try {
                    this.loader.load(queryHeader.getDef(), drillBuf);
                }
                catch (SchemaChangeException ex) {
                    DrillDocumentStream.this.cancelQuery();
                    this.documentListener.failed((Exception)((Object)ex));
                }
                logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) extracting first");
                this.deliverDocument(DrillDocumentStream.this.resultProcessor.extractFirst(this.loader), drillBuf);
                for (int iRow = 1; iRow < rowCount; ++iRow) {
                    DrillDocumentStream drillDocumentStream2 = DrillDocumentStream.this;
                    synchronized (drillDocumentStream2) {
                        if (DrillDocumentStream.this.ignoreData) {
                            break;
                        }
                    }
                    logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) extracting " + iRow);
                    this.deliverDocument(DrillDocumentStream.this.resultProcessor.extractNext(iRow), drillBuf);
                }
                this.cleanup(drillBuf);
            }
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].dataArrived(...) returning");
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
            logger.debug("DocumentResultsListener[" + DrillDocumentStream.this.id + "].queryCompleted(" + state + ")");
            this.documentListener.eos();
        }
    }

    private static class CancellationRequest
    implements Runnable {
        private final SharedDrillSession sharedDrillSession;
        private final UserBitShared.QueryId queryId;

        public CancellationRequest(SharedDrillSession sharedDrillSession, UserBitShared.QueryId queryId) {
            this.sharedDrillSession = sharedDrillSession;
            sharedDrillSession.addRef();
            this.queryId = queryId;
        }

        @Override
        public void run() {
            DrillSession drillSession = this.sharedDrillSession.getSession();
            drillSession.cancelQuery(this.queryId);
            this.sharedDrillSession.close();
        }
    }
}

