/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.client;

import com.google.inject.Inject;
import java.io.IOException;
import java.lang.constant.Constable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Optional;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Iterators;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.RangeSet;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.metamx.common.Pair;
import org.apache.hive.druid.com.metamx.common.guava.BaseSequence;
import org.apache.hive.druid.com.metamx.common.guava.LazySequence;
import org.apache.hive.druid.com.metamx.common.guava.MergeSequence;
import org.apache.hive.druid.com.metamx.common.guava.Sequence;
import org.apache.hive.druid.com.metamx.common.guava.Sequences;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.client.CacheUtil;
import org.apache.hive.druid.io.druid.client.DruidServer;
import org.apache.hive.druid.io.druid.client.ServerView;
import org.apache.hive.druid.io.druid.client.TimelineServerView;
import org.apache.hive.druid.io.druid.client.cache.Cache;
import org.apache.hive.druid.io.druid.client.cache.CacheConfig;
import org.apache.hive.druid.io.druid.client.selector.QueryableDruidServer;
import org.apache.hive.druid.io.druid.client.selector.ServerSelector;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.guice.annotations.BackgroundCaching;
import org.apache.hive.druid.io.druid.guice.annotations.Smile;
import org.apache.hive.druid.io.druid.query.BaseQuery;
import org.apache.hive.druid.io.druid.query.BySegmentResultValueClass;
import org.apache.hive.druid.io.druid.query.CacheStrategy;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryToolChest;
import org.apache.hive.druid.io.druid.query.QueryToolChestWarehouse;
import org.apache.hive.druid.io.druid.query.Result;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.query.aggregation.MetricManipulatorFns;
import org.apache.hive.druid.io.druid.query.filter.DimFilterUtils;
import org.apache.hive.druid.io.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.TimelineLookup;
import org.apache.hive.druid.io.druid.timeline.TimelineObjectHolder;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionChunk;
import org.apache.hive.druid.io.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;

public class CachingClusteredClient<T>
implements QueryRunner<T> {
    private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
    private final QueryToolChestWarehouse warehouse;
    private final TimelineServerView serverView;
    private final Cache cache;
    private final ObjectMapper objectMapper;
    private final CacheConfig cacheConfig;
    private final ListeningExecutorService backgroundExecutorService;

    @Inject
    public CachingClusteredClient(QueryToolChestWarehouse warehouse, TimelineServerView serverView, Cache cache, @Smile ObjectMapper objectMapper, @BackgroundCaching ExecutorService backgroundExecutorService, CacheConfig cacheConfig) {
        this.warehouse = warehouse;
        this.serverView = serverView;
        this.cache = cache;
        this.objectMapper = objectMapper;
        this.cacheConfig = cacheConfig;
        this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
        serverView.registerSegmentCallback(Execs.singleThreaded("CCClient-ServerView-CB-%d"), new ServerView.BaseSegmentCallback(){

            @Override
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                CachingClusteredClient.this.cache.close(segment.getIdentifier());
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext) {
        TimelineLookup<String, ServerSelector> timeline;
        final QueryToolChest toolChest = this.warehouse.getToolChest(query);
        final CacheStrategy strategy = toolChest.getCacheStrategy(query);
        final TreeMap serverSegments = Maps.newTreeMap();
        final ArrayList<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
        final HashMap<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
        boolean useCache = BaseQuery.getContextUseCache(query, true) && strategy != null && this.cacheConfig.isUseCache() && this.cacheConfig.isQueryCacheable(query);
        final boolean populateCache = BaseQuery.getContextPopulateCache(query, true) && strategy != null && this.cacheConfig.isPopulateCache() && this.cacheConfig.isQueryCacheable(query);
        final boolean isBySegment = BaseQuery.getContextBySegment(query, false);
        final ImmutableMap.Builder<String, Constable> contextBuilder = new ImmutableMap.Builder<String, Constable>();
        int priority = BaseQuery.getContextPriority(query, 0);
        contextBuilder.put("priority", Integer.valueOf(priority));
        if (populateCache) {
            contextBuilder.put("populateCache", Boolean.valueOf(false));
            contextBuilder.put("bySegment", Boolean.valueOf(true));
        }
        if ((timeline = this.serverView.getTimeline(query.getDataSource())) == null) {
            return Sequences.empty();
        }
        LinkedHashSet<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();
        LinkedList<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
        int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);
        if (uncoveredIntervalsLimit > 0) {
            ArrayList uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);
            boolean uncoveredIntervalsOverflowed = false;
            for (Interval interval : query.getIntervals()) {
                Iterable<TimelineObjectHolder<String, ServerSelector>> iterable = timeline.lookup(interval);
                long l = interval.getStartMillis();
                long endMillis = interval.getEndMillis();
                for (TimelineObjectHolder<String, ServerSelector> holder : iterable) {
                    Interval holderInterval = holder.getInterval();
                    long intervalStart = holderInterval.getStartMillis();
                    if (!uncoveredIntervalsOverflowed && l != intervalStart) {
                        if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
                            uncoveredIntervals.add(new Interval(l, intervalStart));
                        } else {
                            uncoveredIntervalsOverflowed = true;
                        }
                    }
                    l = holderInterval.getEndMillis();
                    serversLookup.add(holder);
                }
                if (uncoveredIntervalsOverflowed || l >= endMillis) continue;
                if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
                    uncoveredIntervals.add(new Interval(l, endMillis));
                    continue;
                }
                uncoveredIntervalsOverflowed = true;
            }
            if (!uncoveredIntervals.isEmpty()) {
                responseContext.put("uncoveredIntervals", uncoveredIntervals);
                responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
            }
        } else {
            for (Interval interval : query.getIntervals()) {
                Iterables.addAll(serversLookup, timeline.lookup(interval));
            }
        }
        List filteredServersLookup = toolChest.filterSegments(query, serversLookup);
        HashMap<String, Optional<RangeSet<String>>> dimensionRangeCache = Maps.newHashMap();
        for (TimelineObjectHolder holder : filteredServersLookup) {
            Set<PartitionChunk<ServerSelector>> set = DimFilterUtils.filterShards(query.getFilter(), holder.getObject(), new Function<PartitionChunk<ServerSelector>, ShardSpec>(){

                @Override
                public ShardSpec apply(PartitionChunk<ServerSelector> input) {
                    return input.getObject().getSegment().getShardSpec();
                }
            }, dimensionRangeCache);
            for (PartitionChunk<ServerSelector> chunk : set) {
                ServerSelector selector = chunk.getObject();
                SegmentDescriptor descriptor = new SegmentDescriptor(holder.getInterval(), (String)holder.getVersion(), chunk.getChunkNumber());
                segments.add(Pair.of(selector, descriptor));
            }
        }
        byte[] queryCacheKey = (populateCache || useCache) && !isBySegment ? strategy.computeCacheKey(query) : null;
        if (queryCacheKey != null) {
            LinkedHashMap<Pair, Cache.NamedKey> cacheKeys = Maps.newLinkedHashMap();
            for (Pair pair : segments) {
                Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(((ServerSelector)pair.lhs).getSegment().getIdentifier(), (SegmentDescriptor)pair.rhs, queryCacheKey);
                cacheKeys.put(pair, segmentCacheKey);
            }
            if (useCache) {
                Map<Cache.NamedKey, byte[]> map = this.cache.getBulk(Iterables.limit(cacheKeys.values(), this.cacheConfig.getCacheBulkMergeLimit()));
            } else {
                ImmutableMap immutableMap = ImmutableMap.of();
            }
            for (Map.Entry entry : cacheKeys.entrySet()) {
                void var21_29;
                Pair segment = (Pair)entry.getKey();
                Cache.NamedKey segmentCacheKey = (Cache.NamedKey)entry.getValue();
                Interval segmentQueryInterval = ((SegmentDescriptor)segment.rhs).getInterval();
                byte[] cachedValue = (byte[])var21_29.get(segmentCacheKey);
                if (cachedValue != null) {
                    segments.remove(segment);
                    cachedResults.add(Pair.of(segmentQueryInterval, cachedValue));
                    continue;
                }
                if (!populateCache) continue;
                String segmentIdentifier = ((ServerSelector)segment.lhs).getSegment().getIdentifier();
                cachePopulatorMap.put(String.format("%s_%s", segmentIdentifier, segmentQueryInterval), new CachePopulator(this.cache, this.objectMapper, segmentCacheKey));
            }
        }
        for (Pair pair : segments) {
            QueryableDruidServer queryableDruidServer = ((ServerSelector)pair.lhs).pick();
            if (queryableDruidServer == null) {
                log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", pair.rhs, query.getDataSource()).emit();
                continue;
            }
            DruidServer server = queryableDruidServer.getServer();
            ArrayList descriptors = (ArrayList)serverSegments.get(server);
            if (descriptors == null) {
                descriptors = Lists.newArrayList();
                serverSegments.put(server, descriptors);
            }
            descriptors.add(pair.rhs);
        }
        return new LazySequence(new Supplier<Sequence<T>>(){

            @Override
            public Sequence<T> get() {
                ArrayList sequencesByInterval = Lists.newArrayList();
                this.addSequencesFromCache(sequencesByInterval);
                this.addSequencesFromServer(sequencesByInterval);
                return CachingClusteredClient.this.mergeCachedAndUncachedSequences(query, sequencesByInterval);
            }

            private void addSequencesFromCache(ArrayList<Sequence<T>> listOfSequences) {
                if (strategy == null) {
                    return;
                }
                Function pullFromCacheFunction = strategy.pullFromCache();
                final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();
                for (Pair cachedResultPair : cachedResults) {
                    final byte[] cachedResult = (byte[])cachedResultPair.rhs;
                    BaseSequence<Object, Iterator<Object>> cachedSequence = new BaseSequence<Object, Iterator<Object>>(new BaseSequence.IteratorMaker<Object, Iterator<Object>>(){

                        @Override
                        public Iterator<Object> make() {
                            try {
                                if (cachedResult.length == 0) {
                                    return Iterators.emptyIterator();
                                }
                                return CachingClusteredClient.this.objectMapper.readValues(CachingClusteredClient.this.objectMapper.getFactory().createParser(cachedResult), cacheObjectClazz);
                            }
                            catch (IOException e) {
                                throw Throwables.propagate(e);
                            }
                        }

                        @Override
                        public void cleanup(Iterator<Object> iterFromMake) {
                        }
                    });
                    listOfSequences.add(Sequences.map(cachedSequence, pullFromCacheFunction));
                }
            }

            private void addSequencesFromServer(ArrayList<Sequence<T>> listOfSequences) {
                listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size());
                final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
                for (Map.Entry entry : serverSegments.entrySet()) {
                    Sequence<Object> resultSeqToAdd;
                    DruidServer server = (DruidServer)entry.getKey();
                    List descriptors = (List)entry.getValue();
                    QueryRunner clientQueryable = CachingClusteredClient.this.serverView.getQueryRunner(server);
                    if (clientQueryable == null) {
                        log.error("WTF!? server[%s] doesn't have a client Queryable?", server);
                        continue;
                    }
                    MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
                    if (!server.isAssignable() || !populateCache || isBySegment) {
                        if (!isBySegment) {
                            resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
                        } else {
                            Query bySegmentQuery = query;
                            Sequence resultSequence = clientQueryable.run(bySegmentQuery.withQuerySegmentSpec(segmentSpec), responseContext);
                            resultSeqToAdd = Sequences.map(resultSequence, new Function<Result<BySegmentResultValueClass<T>>, Result<BySegmentResultValueClass<T>>>(){

                                @Override
                                public Result<BySegmentResultValueClass<T>> apply(Result<BySegmentResultValueClass<T>> input) {
                                    BySegmentResultValueClass bySegmentValue = input.getValue();
                                    return new Result(input.getTimestamp(), new BySegmentResultValueClass(Lists.transform(bySegmentValue.getResults(), toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing())), bySegmentValue.getSegmentId(), bySegmentValue.getInterval()));
                                }
                            });
                        }
                    } else {
                        Sequence runningSequence = clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext);
                        resultSeqToAdd = new MergeSequence(query.getResultOrdering(), Sequences.map(runningSequence, new Function<Result<BySegmentResultValueClass<T>>, Sequence<T>>(){
                            private final Function<T, Object> cacheFn;
                            {
                                this.cacheFn = strategy.prepareForCache();
                            }

                            @Override
                            public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input) {
                                BySegmentResultValueClass value = input.getValue();
                                final CachePopulator cachePopulator = (CachePopulator)cachePopulatorMap.get(String.format("%s_%s", value.getSegmentId(), value.getInterval()));
                                final ConcurrentLinkedQueue cacheFutures = new ConcurrentLinkedQueue();
                                return Sequences.withEffect(Sequences.map(Sequences.map(Sequences.simple(value.getResults()), new Function<T, T>(){

                                    @Override
                                    public T apply(final T input) {
                                        if (cachePopulator != null) {
                                            cacheFutures.add(CachingClusteredClient.this.backgroundExecutorService.submit(new Callable<Object>(){

                                                @Override
                                                public Object call() {
                                                    return cacheFn.apply(input);
                                                }
                                            }));
                                        }
                                        return input;
                                    }
                                }), toolChest.makePreComputeManipulatorFn(rewrittenQuery, MetricManipulatorFns.deserializing())), new Runnable(){

                                    @Override
                                    public void run() {
                                        if (cachePopulator != null) {
                                            Futures.addCallback(Futures.allAsList(cacheFutures), new FutureCallback<List<Object>>(){

                                                @Override
                                                public void onSuccess(List<Object> cacheData) {
                                                    cachePopulator.populate(cacheData);
                                                    cacheFutures.clear();
                                                }

                                                @Override
                                                public void onFailure(Throwable throwable) {
                                                    log.error(throwable, "Background caching failed", new Object[0]);
                                                }
                                            }, CachingClusteredClient.this.backgroundExecutorService);
                                        }
                                    }
                                }, MoreExecutors.sameThreadExecutor());
                            }
                        }));
                    }
                    listOfSequences.add(resultSeqToAdd);
                }
            }
        });
    }

    protected Sequence<T> mergeCachedAndUncachedSequences(Query<T> query, List<Sequence<T>> sequencesByInterval) {
        if (sequencesByInterval.isEmpty()) {
            return Sequences.empty();
        }
        return new MergeSequence<T>(query.getResultOrdering(), Sequences.simple(sequencesByInterval));
    }

    private static class CachePopulator {
        private final Cache cache;
        private final ObjectMapper mapper;
        private final Cache.NamedKey key;

        public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key) {
            this.cache = cache;
            this.mapper = mapper;
            this.key = key;
        }

        public void populate(Iterable<Object> results) {
            CacheUtil.populate(this.cache, this.mapper, this.key, results);
        }
    }
}

