/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.client;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterators;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.google.common.util.concurrent.SettableFuture;
import org.apache.hive.druid.io.druid.client.CacheUtil;
import org.apache.hive.druid.io.druid.client.cache.Cache;
import org.apache.hive.druid.io.druid.client.cache.CacheConfig;
import org.apache.hive.druid.io.druid.java.util.common.guava.BaseSequence;
import org.apache.hive.druid.io.druid.java.util.common.guava.Sequence;
import org.apache.hive.druid.io.druid.java.util.common.guava.Sequences;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.query.CacheStrategy;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryPlus;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryToolChest;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;

public class CachingQueryRunner<T>
implements QueryRunner<T> {
    private static final Logger log = new Logger(CachingQueryRunner.class);
    private final String segmentIdentifier;
    private final SegmentDescriptor segmentDescriptor;
    private final QueryRunner<T> base;
    private final QueryToolChest toolChest;
    private final Cache cache;
    private final ObjectMapper mapper;
    private final CacheConfig cacheConfig;
    private final ListeningExecutorService backgroundExecutorService;

    public CachingQueryRunner(String segmentIdentifier, SegmentDescriptor segmentDescriptor, ObjectMapper mapper, Cache cache, QueryToolChest toolchest, QueryRunner<T> base, ExecutorService backgroundExecutorService, CacheConfig cacheConfig) {
        this.base = base;
        this.segmentIdentifier = segmentIdentifier;
        this.segmentDescriptor = segmentDescriptor;
        this.toolChest = toolchest;
        this.cache = cache;
        this.mapper = mapper;
        this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
        this.cacheConfig = cacheConfig;
    }

    @Override
    public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
        Query<T> query = queryPlus.getQuery();
        CacheStrategy strategy = this.toolChest.getCacheStrategy(query);
        boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, this.cacheConfig);
        boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, this.cacheConfig);
        final Cache.NamedKey key = strategy != null && (useCache || populateCache) ? CacheUtil.computeSegmentCacheKey(this.segmentIdentifier, this.segmentDescriptor, strategy.computeCacheKey(query)) : null;
        if (useCache) {
            Function cacheFn = strategy.pullFromCache();
            final byte[] cachedResult = this.cache.get(key);
            if (cachedResult != null) {
                final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
                return Sequences.map(new BaseSequence(new BaseSequence.IteratorMaker<T, Iterator<T>>(){

                    @Override
                    public Iterator<T> make() {
                        try {
                            if (cachedResult.length == 0) {
                                return Iterators.emptyIterator();
                            }
                            return CachingQueryRunner.this.mapper.readValues(CachingQueryRunner.this.mapper.getFactory().createParser(cachedResult), cacheObjectClazz);
                        }
                        catch (IOException e) {
                            throw Throwables.propagate(e);
                        }
                    }

                    @Override
                    public void cleanup(Iterator<T> iterFromMake) {
                    }
                }), cacheFn);
            }
        }
        final List cacheFutures = Collections.synchronizedList(Lists.newLinkedList());
        if (populateCache) {
            final Function cacheFn = strategy.prepareForCache();
            return Sequences.withEffect(Sequences.map(this.base.run(queryPlus, responseContext), new Function<T, T>(){

                @Override
                public T apply(final T input) {
                    final SettableFuture future = SettableFuture.create();
                    cacheFutures.add(future);
                    CachingQueryRunner.this.backgroundExecutorService.submit(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                future.set(cacheFn.apply(input));
                            }
                            catch (Exception e) {
                                future.setException(e);
                            }
                        }
                    });
                    return input;
                }
            }), new Runnable(){

                @Override
                public void run() {
                    try {
                        CacheUtil.populate(CachingQueryRunner.this.cache, CachingQueryRunner.this.mapper, key, (Iterable)Futures.allAsList(cacheFutures).get());
                    }
                    catch (Exception e) {
                        log.error(e, "Error while getting future for cache task", new Object[0]);
                        throw Throwables.propagate(e);
                    }
                }
            }, this.backgroundExecutorService);
        }
        return this.base.run(queryPlus, responseContext);
    }
}

