/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.ojai.store.impl;

import com.google.common.base.Preconditions;
import com.mapr.db.impl.OjaiQueryProperties;
import com.mapr.ojai.store.impl.BlockingHandoff;
import com.mapr.ojai.store.impl.DelegatingResultsListener;
import com.mapr.ojai.store.impl.OjaiConnection;
import com.mapr.ojai.store.impl.OjaiQuery;
import com.mapr.ojai.store.impl.QueryContext;
import com.mapr.ojai.store.impl.QueryDocumentStream;
import com.mapr.ojai.store.impl.SharedDrillSession;
import com.mapr.ojai.store.impl.results.OjaiResultProcessor;
import com.mapr.ojai.store.impl.results.QueryResultProcessor;
import com.mapr.ojai.store.impl.results.SqlResultProcessor;
import com.mapr.utils.StackTrace;
import io.netty.buffer.DrillBuf;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.rpc.user.clusterclient.DrillSession;
import org.ojai.Document;
import org.ojai.DocumentListener;
import org.ojai.exceptions.OjaiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DrillDocumentStream
extends QueryDocumentStream {
    private static final Logger logger = LoggerFactory.getLogger(DrillDocumentStream.class);
    private final SharedDrillSession sharedDrillSession;
    private final OjaiConnection ojaiConnection;
    private final QueryContext queryContext;
    private final QueryResultProcessor resultProcessor;
    private UserBitShared.QueryId queryId;
    private volatile boolean ignoreData;
    private final String sql;
    private final long timeoutMs;
    private volatile boolean queryCompleted;
    private static final AtomicInteger idGenerator = new AtomicInteger(0);
    private final int id = idGenerator.addAndGet(1);
    private DelegatingResultsListener decoupler;
    private static final DocumentBatch HANDOFF_SENTINEL = new DocumentBatch(null);
    private DrillHandoff drillHandoff;
    private CountDownLatch streamToLatch;

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext, SharedDrillSession sharedDrillSession) {
        this.ojaiConnection = ojaiConnection;
        this.queryContext = queryContext;
        this.sql = queryContext.getSql();
        OjaiQuery ojaiQuery = queryContext.getQuery();
        long queryTimeout = ojaiQuery == null ? -1L : ojaiQuery.getTimeout();
        this.timeoutMs = queryTimeout != -1L ? queryTimeout : Long.MAX_VALUE;
        this.resultProcessor = queryContext.isOjaiQuery() ? new OjaiResultProcessor(queryContext) : new SqlResultProcessor(queryContext.excludeId());
        this.sharedDrillSession = sharedDrillSession != null ? sharedDrillSession : new SharedDrillSession(ojaiConnection, queryContext.getClusterName());
        this.sharedDrillSession.addRef();
    }

    public DrillDocumentStream(OjaiConnection ojaiConnection, QueryContext queryContext) {
        this(ojaiConnection, queryContext, null);
    }

    @Override
    protected synchronized void cancelQuery() {
        this.ignoreData = true;
        if (!this.isCanceled()) {
            if (!this.queryCompleted) {
                String stringId = this.queryId != null ? QueryIdHelper.getQueryId((UserBitShared.QueryId)this.queryId) : "[UNKNOWN]";
                logger.debug("cancelling query {}", (Object)stringId);
                if (this.queryId != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(new StackTrace().toString());
                    }
                    DrillSession drillSession = this.sharedDrillSession.getSession();
                    drillSession.cancelQuery(this.queryId);
                }
            }
            super.cancelQuery();
        }
    }

    @Override
    protected void submitQuery(DocumentListener docListener) {
        DrillSession drillSession = this.sharedDrillSession.getSession();
        DrillMessageListener dml = docListener == null ? new DrillMessageHandoff(this.drillHandoff) : new DrillMessagePassthrough(docListener);
        this.decoupler = new DelegatingResultsListener(this.ojaiConnection.getExecutorService(), this.id, new DocumentResultsListener(drillSession.getAllocator(), dml));
        String sql = this.queryContext.getSql();
        logger.debug("DrillDocumentStream[{}] submitting \"{}\"", (Object)this.id, (Object)sql);
        drillSession.executeStatement(this.queryContext.getSql(), (UserResultsListener)this.decoupler);
    }

    @Override
    protected void closeDerived() {
        logger.debug("closeDerived()");
        if (this.drillHandoff != null) {
            this.drillHandoff.close();
        }
        if (this.decoupler != null) {
            this.decoupler.terminate();
        }
        this.sharedDrillSession.close();
        if (this.streamToLatch != null) {
            this.streamToLatch.countDown();
        }
    }

    public Iterator<Document> iterator() {
        this.checkState();
        this.drillHandoff = new DrillHandoff(this.timeoutMs);
        DocumentIterator docIter = new DocumentIterator(this.drillHandoff);
        return docIter;
    }

    private void asyncStreamTo(DocumentListener docListener) {
        this.checkState();
        this.isUsed = true;
        this.submitQuery(docListener);
    }

    @Override
    public void streamTo(final DocumentListener docListener) {
        this.checkState();
        this.streamToLatch = new CountDownLatch(1);
        this.asyncStreamTo(new DocumentListener(){

            public boolean documentArrived(Document doc) {
                return docListener.documentArrived(doc);
            }

            public void eos() {
                try {
                    docListener.eos();
                }
                finally {
                    DrillDocumentStream.this.streamToLatch.countDown();
                }
            }

            public void failed(Exception ex) {
                try {
                    docListener.failed(ex);
                }
                finally {
                    DrillDocumentStream.this.streamToLatch.countDown();
                }
            }
        });
        try {
            this.streamToLatch.await();
        }
        catch (InterruptedException iex) {
            throw new OjaiException((Throwable)iex);
        }
    }

    @Override
    public OjaiQueryProperties.QueryPath getQueryPath() {
        return OjaiQueryProperties.QueryPath.DRILL;
    }

    public UserBitShared.QueryId getQueryId() {
        return this.queryId;
    }

    public QueryContext getQueryContext() {
        return this.queryContext;
    }

    @Override
    public void getQueryPlan(List<Map<String, Object>> planList) {
        HashMap<String, Object> myMap = new HashMap<String, Object>();
        myMap.put("streamName", this.getClass().getSimpleName());
        HashMap<String, String> valueMap = new HashMap<String, String>();
        valueMap.put("sql", this.sql);
        myMap.put("parameters", valueMap);
        planList.add(myMap);
    }

    private class DocumentIterator
    implements Iterator<Document> {
        private final Iterator<DocumentBatch> msgIter;
        private DocumentBatch documentBatch;

        public DocumentIterator(DrillHandoff drillHandoff) {
            this.msgIter = drillHandoff.iterator();
        }

        @Override
        public boolean hasNext() {
            while (true) {
                boolean b;
                if (this.documentBatch == null) {
                    if (!this.msgIter.hasNext()) {
                        return false;
                    }
                    this.documentBatch = this.msgIter.next();
                }
                if (b = this.documentBatch.hasNext()) {
                    return true;
                }
                this.documentBatch = null;
            }
        }

        @Override
        public Document next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.documentBatch.next();
        }
    }

    private class DrillHandoff
    extends BlockingHandoff<DocumentBatch> {
        public DrillHandoff(long timeoutMs) {
            super(HANDOFF_SENTINEL, timeoutMs);
        }

        @Override
        protected void itemTimeout(long timeoutMs) {
            this.throwQueryTimeout();
        }

        @Override
        protected void setup(CountDownLatch readyLatch) {
            Preconditions.checkState((DrillDocumentStream.this.drillHandoff != null ? 1 : 0) != 0);
            DrillDocumentStream.this.asyncStreamTo(null);
            readyLatch.countDown();
        }

        @Override
        protected void iteratorTimeout() {
            this.throwQueryTimeout();
        }
    }

    private class DocumentResultsListener
    implements UserResultsListener {
        private final RecordBatchLoader loader;
        private final DrillMessageListener drillMessageListener;

        public DocumentResultsListener(BufferAllocator drillAllocator, DrillMessageListener drillMessageListener) {
            this.loader = new RecordBatchLoader(drillAllocator);
            this.drillMessageListener = drillMessageListener;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void queryIdArrived(UserBitShared.QueryId queryId) {
            logger.debug("DocumentResultsListener[{}].queryIdArrived(queryId = {} , sql string = {}", new Object[]{DrillDocumentStream.this.id, QueryIdHelper.getQueryId((UserBitShared.QueryId)queryId), DrillDocumentStream.this.sql});
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                DrillDocumentStream.this.queryId = queryId;
                if (DrillDocumentStream.this.ignoreData) {
                    DrillDocumentStream.this.cancelQuery();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void submissionFailed(UserException ex) {
            logger.debug("DocumentResultsListener[{}].submissionFailed(ex = {})", (Object)DrillDocumentStream.this.id, (Object)ex);
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                DrillDocumentStream.this.ignoreData = true;
            }
            String exMsg = "Drill submissionFailed for \"" + DrillDocumentStream.this.queryContext.getSql() + '\"' + " please check storageplugin passed to queryservice.";
            this.drillMessageListener.failed((Exception)new OjaiException(exMsg, (Throwable)ex));
        }

        private void cleanup(DrillBuf drillBuf) {
            this.loader.clear();
            drillBuf.release();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
            logger.debug("DocumentResultsListener[{}].dataArrived(...)", (Object)DrillDocumentStream.this.id);
            logger.debug("DocumentResultsListener[{}].dataArrived(...) synchronizing on ", (Object)DrillDocumentStream.this.id, (Object)DrillDocumentStream.this);
            DrillDocumentStream drillDocumentStream = DrillDocumentStream.this;
            synchronized (drillDocumentStream) {
                if (DrillDocumentStream.this.ignoreData) {
                    result.release();
                    return;
                }
            }
            UserBitShared.QueryData queryHeader = result.getHeader();
            int rowCount = queryHeader.getRowCount();
            logger.debug("DocumentResultsListener[{}].dataArrived(...) rowCount = {}", (Object)DrillDocumentStream.this.id, (Object)rowCount);
            if (result.hasData()) {
                DrillBuf drillBuf = result.getData();
                try {
                    this.loader.load(queryHeader.getDef(), drillBuf);
                }
                catch (SchemaChangeException ex) {
                    DrillDocumentStream.this.cancelQuery();
                    try {
                        this.drillMessageListener.failed((Exception)((Object)ex));
                    }
                    finally {
                        this.cleanup(drillBuf);
                        return;
                    }
                }
                LinkedList<Document> docList = new LinkedList<Document>();
                logger.debug("DocumentResultsListener[{}].dataArrived(...) extracting first", (Object)DrillDocumentStream.this.id);
                docList.add(DrillDocumentStream.this.resultProcessor.extractFirst(this.loader));
                for (int iRow = 1; iRow < rowCount; ++iRow) {
                    logger.debug("DocumentResultsListener[{}].dataArrived(...) extracting {}", (Object)DrillDocumentStream.this.id, (Object)iRow);
                    docList.add(DrillDocumentStream.this.resultProcessor.extractNext(iRow));
                }
                this.cleanup(drillBuf);
                this.drillMessageListener.documentsArrived(docList);
            }
            logger.debug("DocumentResultsListener[{}].dataArrived(...) returning", (Object)DrillDocumentStream.this.id);
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
            logger.debug("DocumentResultsListener[{}].queryCompleted({})", (Object)DrillDocumentStream.this.id, (Object)state);
            DrillDocumentStream.this.queryCompleted = true;
            this.drillMessageListener.eos();
        }
    }

    private static class DrillMessageHandoff
    implements DrillMessageListener {
        private final DrillHandoff drillHandoff;

        public DrillMessageHandoff(DrillHandoff drillHandoff) {
            this.drillHandoff = drillHandoff;
        }

        @Override
        public void documentsArrived(List<Document> docList) {
            this.drillHandoff.put(new DocumentBatch(docList));
        }

        @Override
        public void failed(Exception ex) {
            this.drillHandoff.putException(ex);
        }

        @Override
        public void eos() {
            this.drillHandoff.putEos();
        }
    }

    private static class DocumentBatch {
        private final Iterator<Document> docIter;

        public DocumentBatch(List<Document> docList) {
            this.docIter = docList == null ? null : docList.iterator();
        }

        public boolean hasNext() {
            if (this.docIter == null) {
                return false;
            }
            return this.docIter.hasNext();
        }

        public Document next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.docIter.next();
        }
    }

    private static class DrillMessagePassthrough
    implements DrillMessageListener {
        private final DocumentListener docListener;

        public DrillMessagePassthrough(DocumentListener docListener) {
            this.docListener = docListener;
        }

        @Override
        public void documentsArrived(List<Document> docList) {
            for (Document doc : docList) {
                this.docListener.documentArrived(doc);
            }
        }

        @Override
        public void failed(Exception ex) {
            this.docListener.failed(ex);
        }

        @Override
        public void eos() {
            this.docListener.eos();
        }
    }

    static interface DrillMessageListener {
        public void documentsArrived(List<Document> var1);

        public void failed(Exception var1);

        public void eos();
    }
}

