/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator.helper;

import com.google.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.collect.HashMultiset;
import org.apache.hive.druid.com.google.common.collect.ImmutableSet;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Multiset;
import org.apache.hive.druid.com.google.common.collect.Ordering;
import org.apache.hive.druid.io.druid.client.indexing.IndexingServiceClient;
import org.apache.hive.druid.io.druid.common.config.JacksonConfigManager;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.guava.FunctionalIterable;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hive.druid.io.druid.server.coordinator.CoordinatorStats;
import org.apache.hive.druid.io.druid.server.coordinator.DatasourceWhitelist;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.hive.druid.io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.TimelineObjectHolder;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.apache.hive.druid.io.druid.timeline.partition.NoneShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;

public class DruidCoordinatorSegmentMerger
implements DruidCoordinatorHelper {
    private static final Logger log = new Logger(DruidCoordinatorSegmentMerger.class);
    private final IndexingServiceClient indexingServiceClient;
    private final AtomicReference<DatasourceWhitelist> whiteListRef;

    @Inject
    public DruidCoordinatorSegmentMerger(IndexingServiceClient indexingServiceClient, JacksonConfigManager configManager) {
        this.indexingServiceClient = indexingServiceClient;
        this.whiteListRef = configManager.watch("coordinator.whitelist", DatasourceWhitelist.class);
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        VersionedIntervalTimeline timeline;
        DatasourceWhitelist whitelist = this.whiteListRef.get();
        CoordinatorStats stats = new CoordinatorStats();
        HashMap dataSources = Maps.newHashMap();
        for (DataSegment dataSegment : params.getAvailableSegments()) {
            if (whitelist != null && !whitelist.contains(dataSegment.getDataSource())) continue;
            timeline = (VersionedIntervalTimeline)dataSources.get(dataSegment.getDataSource());
            if (timeline == null) {
                timeline = new VersionedIntervalTimeline(Ordering.natural());
                dataSources.put(dataSegment.getDataSource(), timeline);
            }
            timeline.add(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().createChunk(dataSegment));
        }
        for (Map.Entry entry : dataSources.entrySet()) {
            timeline = (VersionedIntervalTimeline)entry.getValue();
            List timelineObjects = timeline.lookup(new Interval((ReadableInstant)DateTimes.EPOCH, (ReadableInstant)DateTimes.of("3000-01-01")));
            SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
            for (int i = 0; i < timelineObjects.size(); ++i) {
                if (segmentsToMerge.add(timelineObjects.get(i)) && segmentsToMerge.getByteCount() <= params.getCoordinatorDynamicConfig().getMergeBytesLimit() && segmentsToMerge.getSegmentCount() < params.getCoordinatorDynamicConfig().getMergeSegmentsLimit()) continue;
                i -= segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
                if (segmentsToMerge.getSegmentCount() > 1) {
                    stats.addToGlobalStat("mergedCount", this.mergeSegments(segmentsToMerge, (String)entry.getKey()));
                }
                if (segmentsToMerge.getSegmentCount() == 0) {
                    ++i;
                }
                segmentsToMerge = new SegmentsToMerge();
            }
            segmentsToMerge.backtrack(params.getCoordinatorDynamicConfig().getMergeBytesLimit());
            if (segmentsToMerge.getSegmentCount() <= 1) continue;
            stats.addToGlobalStat("mergedCount", this.mergeSegments(segmentsToMerge, (String)entry.getKey()));
        }
        log.info("Issued merge requests for %s segments", stats.getGlobalStat("mergedCount"));
        params.getEmitter().emit(new ServiceMetricEvent.Builder().build("coordinator/merge/count", stats.getGlobalStat("mergedCount")));
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource) {
        List<DataSegment> segments = segmentsToMerge.getSegments();
        List<String> segmentNames = Lists.transform(segments, new Function<DataSegment, String>(){

            @Override
            public String apply(DataSegment input) {
                return input.getIdentifier();
            }
        });
        log.info("[%s] Found %d segments to merge %s", dataSource, segments.size(), segmentNames);
        try {
            this.indexingServiceClient.mergeSegments(segments);
        }
        catch (Exception e) {
            log.error(e, "[%s] Merging error for segments [%s]", dataSource, segmentNames);
        }
        return segments.size();
    }

    private static class SegmentsToMerge {
        private final Multiset<DataSegment> segments;
        private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects = Lists.newArrayList();
        private long byteCount = 0L;

        private SegmentsToMerge() {
            this.segments = HashMultiset.create();
        }

        public List<DataSegment> getSegments() {
            return ImmutableSet.copyOf(FunctionalIterable.create(this.timelineObjects).transformCat(new Function<Pair<TimelineObjectHolder<String, DataSegment>, Interval>, Iterable<DataSegment>>(){

                @Override
                public Iterable<DataSegment> apply(Pair<TimelineObjectHolder<String, DataSegment>, Interval> input) {
                    return Iterables.transform(((TimelineObjectHolder)input.lhs).getObject(), new Function<PartitionChunk<DataSegment>, DataSegment>(){

                        @Override
                        public DataSegment apply(PartitionChunk<DataSegment> input) {
                            return input.getObject();
                        }
                    });
                }
            })).asList();
        }

        public boolean add(TimelineObjectHolder<String, DataSegment> timelineObject) {
            PartitionChunk firstChunk;
            Interval timelineObjectInterval = timelineObject.getInterval();
            if (this.timelineObjects.size() > 0) {
                Preconditions.checkArgument(timelineObjectInterval.getStart().getMillis() >= ((TimelineObjectHolder)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).lhs).getInterval().getEnd().getMillis(), "timeline objects must be provided in order");
            }
            if ((firstChunk = (PartitionChunk)Iterables.getFirst(timelineObject.getObject(), null)) == null) {
                throw new ISE("Unable to find an underlying interval", new Object[0]);
            }
            Interval underlyingInterval = ((DataSegment)firstChunk.getObject()).getInterval();
            for (PartitionChunk<DataSegment> partitionChunk : timelineObject.getObject()) {
                if (!(partitionChunk.getObject().getShardSpec() instanceof NoneShardSpec)) {
                    return false;
                }
                this.segments.add(partitionChunk.getObject());
                if (this.segments.count(partitionChunk.getObject()) != 1) continue;
                this.byteCount += partitionChunk.getObject().getSize();
            }
            Interval mergedUnderlyingInterval = this.getMergedUnderlyingInterval();
            if (mergedUnderlyingInterval == null) {
                this.timelineObjects.add(Pair.of(timelineObject, underlyingInterval));
            } else {
                DateTime dateTime = underlyingInterval.getStart().isBefore((ReadableInstant)mergedUnderlyingInterval.getStart()) ? underlyingInterval.getStart() : mergedUnderlyingInterval.getStart();
                DateTime end = underlyingInterval.getEnd().isAfter((ReadableInstant)mergedUnderlyingInterval.getEnd()) ? underlyingInterval.getEnd() : mergedUnderlyingInterval.getEnd();
                this.timelineObjects.add(Pair.of(timelineObject, new Interval((ReadableInstant)dateTime, (ReadableInstant)end)));
            }
            return true;
        }

        public Interval getMergedTimelineInterval() {
            if (this.timelineObjects.isEmpty()) {
                return null;
            }
            return new Interval((ReadableInstant)((TimelineObjectHolder)this.timelineObjects.get((int)0).lhs).getInterval().getStart(), (ReadableInstant)((TimelineObjectHolder)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).lhs).getInterval().getEnd());
        }

        public Interval getMergedUnderlyingInterval() {
            if (this.timelineObjects.isEmpty()) {
                return null;
            }
            return (Interval)this.timelineObjects.get((int)(this.timelineObjects.size() - 1)).rhs;
        }

        public long getByteCount() {
            return this.byteCount;
        }

        public int getSegmentCount() {
            return this.timelineObjects.size();
        }

        public boolean isComplete() {
            return this.timelineObjects.size() == 0 || this.getMergedTimelineInterval().equals((Object)this.getMergedUnderlyingInterval());
        }

        public int backtrack(long maxSize) {
            Preconditions.checkArgument(maxSize >= 0L, "maxSize >= 0");
            int removed = 0;
            while (!this.isComplete() || this.byteCount > maxSize) {
                ++removed;
                TimelineObjectHolder removedHolder = (TimelineObjectHolder)this.timelineObjects.remove((int)(this.timelineObjects.size() - 1)).lhs;
                for (PartitionChunk segment : removedHolder.getObject()) {
                    this.segments.remove(segment.getObject());
                    if (this.segments.count(segment.getObject()) != 0) continue;
                    this.byteCount -= ((DataSegment)segment.getObject()).getSize();
                }
            }
            return removed;
        }
    }
}

