/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.math3.util.FastMath;
import org.apache.hive.druid.com.google.common.base.Predicates;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.hive.druid.io.druid.server.coordinator.BalancerStrategy;
import org.apache.hive.druid.io.druid.server.coordinator.CoordinatorStats;
import org.apache.hive.druid.io.druid.server.coordinator.ReservoirSegmentSampler;
import org.apache.hive.druid.io.druid.server.coordinator.ServerHolder;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.joda.time.Interval;

public class CostBalancerStrategy
implements BalancerStrategy {
    private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
    private static final double HALF_LIFE = 24.0;
    static final double LAMBDA = Math.log(2.0) / 24.0;
    static final double INV_LAMBDA_SQUARE = 1.0 / (LAMBDA * LAMBDA);
    private static final double MILLIS_IN_HOUR = 3600000.0;
    private static final double MILLIS_FACTOR = 3600000.0 / LAMBDA;
    private final ListeningExecutorService exec;

    public static double computeJointSegmentsCost(DataSegment segmentA, DataSegment segmentB) {
        Interval intervalA = segmentA.getInterval();
        Interval intervalB = segmentB.getInterval();
        double t0 = intervalA.getStartMillis();
        double t1 = ((double)intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
        double start = ((double)intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
        double end = ((double)intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
        double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
        return INV_LAMBDA_SQUARE * CostBalancerStrategy.intervalCost(t1, start, end) * multiplier;
    }

    public static double intervalCost(double x1, double y0, double y1) {
        if (x1 == 0.0 || y1 == y0) {
            return 0.0;
        }
        if (y0 < 0.0) {
            double tmp = x1;
            x1 = y1 - y0;
            y1 = tmp - y0;
            y0 = -y0;
        }
        if (y0 < x1) {
            double gamma;
            double beta;
            if (y1 <= x1) {
                beta = y1 - y0;
                gamma = x1 - y0;
            } else {
                beta = x1 - y0;
                gamma = y1 - y0;
            }
            return CostBalancerStrategy.intervalCost(y0, y0, y1) + CostBalancerStrategy.intervalCost(beta, beta, gamma) + 2.0 * (beta + FastMath.exp(-beta) - 1.0);
        }
        double exy0 = FastMath.exp(x1 - y0);
        double exy1 = FastMath.exp(x1 - y1);
        double ey0 = FastMath.exp(0.0 - y0);
        double ey1 = FastMath.exp(0.0 - y1);
        return ey1 - ey0 - (exy1 - exy0);
    }

    public CostBalancerStrategy(ListeningExecutorService exec) {
        this.exec = exec;
    }

    @Override
    public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders) {
        ServerHolder holder = (ServerHolder)this.chooseBestServer((DataSegment)proposalSegment, serverHolders, (boolean)false).rhs;
        if (holder != null && !holder.isServingSegment(proposalSegment)) {
            return holder;
        }
        return null;
    }

    @Override
    public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders) {
        return (ServerHolder)this.chooseBestServer((DataSegment)proposalSegment, serverHolders, (boolean)true).rhs;
    }

    static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet) {
        double totalCost = 0.0;
        for (DataSegment s : segmentSet) {
            totalCost += CostBalancerStrategy.computeJointSegmentsCost(segment, s);
        }
        return totalCost;
    }

    @Override
    public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders) {
        ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
        return sampler.getRandomBalancerSegmentHolder(serverHolders);
    }

    public double calculateInitialTotalCost(List<ServerHolder> serverHolders) {
        double cost = 0.0;
        for (ServerHolder server : serverHolders) {
            Collection<DataSegment> segments = server.getServer().getSegments().values();
            for (DataSegment s : segments) {
                cost += CostBalancerStrategy.computeJointSegmentsCost(s, segments);
            }
        }
        return cost;
    }

    public double calculateNormalization(List<ServerHolder> serverHolders) {
        double cost = 0.0;
        for (ServerHolder server : serverHolders) {
            for (DataSegment segment : server.getServer().getSegments().values()) {
                cost += CostBalancerStrategy.computeJointSegmentsCost(segment, segment);
            }
        }
        return cost;
    }

    @Override
    public void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList) {
        double initialTotalCost = this.calculateInitialTotalCost(serverHolderList);
        double normalization = this.calculateNormalization(serverHolderList);
        double normalizedInitialCost = initialTotalCost / normalization;
        stats.addToTieredStat("initialCost", tier, (long)initialTotalCost);
        stats.addToTieredStat("normalization", tier, (long)normalization);
        stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long)(normalizedInitialCost * 1000.0));
        log.info("[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", tier, initialTotalCost, normalization, normalizedInitialCost);
    }

    protected double computeCost(DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer) {
        long proposalSegmentSize = proposalSegment.getSize();
        if (!includeCurrentServer && server.isServingSegment(proposalSegment)) {
            return Double.POSITIVE_INFINITY;
        }
        if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
            return Double.POSITIVE_INFINITY;
        }
        double cost = 0.0;
        cost += CostBalancerStrategy.computeJointSegmentsCost(proposalSegment, Iterables.filter(server.getServer().getSegments().values(), Predicates.not(Predicates.equalTo(proposalSegment))));
        cost += CostBalancerStrategy.computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsToLoad());
        return cost -= CostBalancerStrategy.computeJointSegmentsCost(proposalSegment, server.getPeon().getSegmentsMarkedToDrop());
    }

    protected Pair<Double, ServerHolder> chooseBestServer(final DataSegment proposalSegment, Iterable<ServerHolder> serverHolders, final boolean includeCurrentServer) {
        Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
        ArrayList<Future> futures = Lists.newArrayList();
        for (final ServerHolder server : serverHolders) {
            futures.add(this.exec.submit(new Callable<Pair<Double, ServerHolder>>(){

                @Override
                public Pair<Double, ServerHolder> call() throws Exception {
                    return Pair.of(CostBalancerStrategy.this.computeCost(proposalSegment, server, includeCurrentServer), server);
                }
            }));
        }
        ListenableFuture resultsFuture = Futures.allAsList(futures);
        ArrayList<Pair> bestServers = new ArrayList<Pair>();
        bestServers.add(bestServer);
        try {
            for (Pair server : (List)resultsFuture.get()) {
                if (!((Double)server.lhs <= (Double)((Pair)bestServers.get((int)0)).lhs)) continue;
                if ((Double)server.lhs < (Double)((Pair)bestServers.get((int)0)).lhs) {
                    bestServers.clear();
                }
                bestServers.add(server);
            }
            bestServer = (Pair)bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
        }
        catch (Exception e) {
            log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.", new Object[0]).emit();
        }
        return bestServer;
    }
}

