/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.ojai.store.impl;

import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.impl.OjaiQueryProperties;
import com.mapr.ojai.store.impl.AbstractDocumentFilter;
import com.mapr.ojai.store.impl.BlockingHandoff;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.ojai.Document;
import org.ojai.DocumentStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimeoutDocumentFilter
extends AbstractDocumentFilter {
    private static final Logger logger = LoggerFactory.getLogger(TimeoutDocumentFilter.class);
    private final long timeoutMs;
    private volatile Thread fetcherThread;
    private static final Document HANDOFF_SENTINEL = MapRDBImpl.newDocument();
    private final TimeoutHandoff timeoutHandoff;
    private final ExecutorService executorService;

    public TimeoutDocumentFilter(DocumentStream upstreamStream, ExecutorService executorService, long timeoutMs) {
        super(upstreamStream);
        this.timeoutMs = timeoutMs;
        this.executorService = executorService;
        this.timeoutHandoff = new TimeoutHandoff();
    }

    @Override
    public Iterator<Document> iterator() {
        this.checkState();
        Iterator<Document> documentIterator = this.timeoutHandoff.iterator();
        return documentIterator;
    }

    public void getQueryPlan(List<Map<String, Object>> planList) {
        if (this.upstreamStream == null) {
            return;
        }
        ((OjaiQueryProperties)this.upstreamStream).getQueryPlan(planList);
        HashMap<String, Object> myMap = new HashMap<String, Object>();
        myMap.put("streamName", this.getClass().getSimpleName());
        HashMap<String, Long> valueMap = new HashMap<String, Long>();
        valueMap.put("timeoutMs", this.timeoutMs);
        myMap.put("parameters", valueMap);
        planList.add(myMap);
    }

    private class UpstreamFetcher
    implements Runnable {
        private final CountDownLatch readyLatch;

        public UpstreamFetcher(CountDownLatch readyLatch) {
            this.readyLatch = readyLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            TimeoutDocumentFilter.super.iterator();
            TimeoutDocumentFilter.this.fetcherThread = Thread.currentThread();
            logger.debug("UpstreamFetcher.run() ready");
            this.readyLatch.countDown();
            while (!TimeoutDocumentFilter.this.isCanceled()) {
                try {
                    if (!TimeoutDocumentFilter.this.docIter.hasNext()) {
                        logger.debug("UpstreamFetcher.run() putEos()");
                        TimeoutDocumentFilter.this.timeoutHandoff.putEos();
                        return;
                    }
                    Document nextDoc = (Document)TimeoutDocumentFilter.this.docIter.next();
                    logger.debug("UpstreamFetcher.run() put()");
                    TimeoutDocumentFilter.this.timeoutHandoff.put(nextDoc);
                }
                catch (Exception ex) {
                    logger.debug("UpstreamFetcher.run() putException()");
                    TimeoutDocumentFilter.this.timeoutHandoff.putException(ex);
                    TimeoutHandoff timeoutHandoff = TimeoutDocumentFilter.this.timeoutHandoff;
                    synchronized (timeoutHandoff) {
                        TimeoutDocumentFilter.this.fetcherThread = null;
                    }
                    return;
                }
            }
            return;
        }
    }

    private class TimeoutHandoff
    extends BlockingHandoff<Document> {
        public TimeoutHandoff() {
            super(HANDOFF_SENTINEL, TimeoutDocumentFilter.this.timeoutMs);
        }

        @Override
        protected void iteratorTimeout() {
            this.throwQueryTimeout();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void itemTimeout(long timeoutMs) {
            TimeoutHandoff timeoutHandoff = this;
            synchronized (timeoutHandoff) {
                if (TimeoutDocumentFilter.this.fetcherThread != null) {
                    TimeoutDocumentFilter.this.fetcherThread.interrupt();
                }
            }
            this.throwQueryTimeout();
        }

        @Override
        protected void setup(CountDownLatch readyLatch) {
            TimeoutDocumentFilter.this.executorService.execute(new UpstreamFetcher(readyLatch));
        }
    }
}

