/*
 * Decompiled with CFR 0.152.
 */
package voldemort.client.rebalance;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.RebalanceClientConfig;
import voldemort.client.rebalance.RebalanceClusterPlan;
import voldemort.client.rebalance.RebalanceNodePlan;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.rebalance.AlreadyRebalancingException;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class RebalanceController {
    private static final int MAX_TRIES = 2;
    private static Logger logger = Logger.getLogger(RebalanceController.class);
    private final AdminClient adminClient;
    private final RebalanceClientConfig rebalanceConfig;

    public RebalanceController(String bootstrapUrl, RebalanceClientConfig rebalanceConfig) {
        this.adminClient = new AdminClient(bootstrapUrl, (AdminClientConfig)rebalanceConfig);
        this.rebalanceConfig = rebalanceConfig;
    }

    public RebalanceController(Cluster cluster, RebalanceClientConfig config) {
        this.adminClient = new AdminClient(cluster, (AdminClientConfig)config);
        this.rebalanceConfig = config;
    }

    private ExecutorService createExecutors(int numThreads) {
        return Executors.newFixedThreadPool(numThreads, new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(r.getClass().getName());
                return thread;
            }
        });
    }

    public void rebalance(Cluster targetCluster) {
        Versioned<Cluster> currentVersionedCluster = RebalanceUtils.getLatestCluster(new ArrayList<Integer>(), this.adminClient);
        this.rebalance(currentVersionedCluster.getValue(), targetCluster);
    }

    private SetMultimap<Integer, RebalancePartitionsInfo> divideRebalanceNodePlan(RebalanceNodePlan rebalanceNodePlan) {
        HashMultimap<Integer, RebalancePartitionsInfo> plan = HashMultimap.create();
        List<RebalancePartitionsInfo> rebalanceSubTaskList = rebalanceNodePlan.getRebalanceTaskList();
        for (RebalancePartitionsInfo rebalanceSubTask : rebalanceSubTaskList) {
            plan.put(rebalanceSubTask.getDonorId(), rebalanceSubTask);
        }
        return plan;
    }

    public void rebalance(Cluster currentCluster, Cluster targetCluster) {
        logger.debug("Current Cluster configuration:" + currentCluster);
        logger.debug("Target Cluster configuration:" + targetCluster);
        this.adminClient.setAdminClientCluster(currentCluster);
        final RebalanceClusterPlan rebalanceClusterPlan = new RebalanceClusterPlan(currentCluster, targetCluster, RebalanceUtils.getStoreNameList(currentCluster, this.adminClient), this.rebalanceConfig.isDeleteAfterRebalancingEnabled());
        logger.info(rebalanceClusterPlan);
        currentCluster = this.getClusterWithNewNodes(currentCluster, targetCluster);
        this.adminClient.setAdminClientCluster(currentCluster);
        Node firstNode = currentCluster.getNodes().iterator().next();
        VectorClock latestClock = (VectorClock)RebalanceUtils.getLatestCluster(new ArrayList<Integer>(), this.adminClient).getVersion();
        RebalanceUtils.propagateCluster(this.adminClient, currentCluster, latestClock.incremented(firstNode.getId(), System.currentTimeMillis()), new ArrayList<Integer>());
        ExecutorService executor = this.createExecutors(this.rebalanceConfig.getMaxParallelRebalancing());
        for (int nThreads = 0; nThreads < this.rebalanceConfig.getMaxParallelRebalancing(); ++nThreads) {
            executor.execute(new Runnable(){

                public void run() {
                    while (!rebalanceClusterPlan.getRebalancingTaskQueue().isEmpty()) {
                        RebalanceNodePlan rebalanceTask = rebalanceClusterPlan.getRebalancingTaskQueue().poll();
                        if (null == rebalanceTask) continue;
                        final int stealerNodeId = rebalanceTask.getStealerNode();
                        final SetMultimap rebalanceSubTaskMap = RebalanceController.this.divideRebalanceNodePlan(rebalanceTask);
                        Set parallelDonors = rebalanceSubTaskMap.keySet();
                        ExecutorService parallelDonorExecutor = RebalanceController.this.createExecutors(RebalanceController.this.rebalanceConfig.getMaxParallelDonors());
                        Iterator i$ = parallelDonors.iterator();
                        while (i$.hasNext()) {
                            final int donorNodeId = (Integer)i$.next();
                            parallelDonorExecutor.execute(new Runnable(){

                                public void run() {
                                    Set tasksForDonor = rebalanceSubTaskMap.get(donorNodeId);
                                    for (RebalancePartitionsInfo stealInfo : tasksForDonor) {
                                        logger.info("Starting rebalancing for stealerNode: " + stealerNodeId + " with rebalanceInfo: " + stealInfo);
                                        try {
                                            int rebalanceAsyncId = RebalanceController.this.startNodeRebalancing(stealInfo);
                                            try {
                                                RebalanceController.this.commitClusterChanges(RebalanceController.this.adminClient.getAdminClientCluster().getNodeById(stealerNodeId), stealInfo);
                                            }
                                            catch (Exception e) {
                                                if (-1 != rebalanceAsyncId) {
                                                    RebalanceController.this.adminClient.stopAsyncRequest(stealInfo.getStealerId(), rebalanceAsyncId);
                                                }
                                                throw e;
                                            }
                                            RebalanceController.this.adminClient.waitForCompletion(stealInfo.getStealerId(), rebalanceAsyncId, RebalanceController.this.rebalanceConfig.getRebalancingClientTimeoutSeconds(), TimeUnit.SECONDS);
                                            logger.info("Succesfully finished rebalance attempt: " + stealInfo);
                                        }
                                        catch (UnreachableStoreException e) {
                                            logger.error("StealerNode " + stealerNodeId + " is unreachable, please make sure it is up and running.", e);
                                        }
                                        catch (VoldemortRebalancingException e) {
                                            logger.error(e);
                                            for (Exception cause : e.getCauses()) {
                                                logger.error(cause);
                                            }
                                        }
                                        catch (Exception e) {
                                            logger.error("Rebalancing task failed with exception", e);
                                        }
                                    }
                                }
                            });
                        }
                        try {
                            RebalanceController.this.executorShutDown(parallelDonorExecutor);
                        }
                        catch (Exception e) {
                            logger.error("Interrupted", e);
                        }
                    }
                    logger.info("Thread run() finished:\n");
                }
            });
        }
        this.executorShutDown(executor);
    }

    private int startNodeRebalancing(RebalancePartitionsInfo rebalanceSubTask) {
        AlreadyRebalancingException exception = null;
        for (int nTries = 0; nTries < 2; ++nTries) {
            try {
                return this.adminClient.rebalanceNode(rebalanceSubTask);
            }
            catch (AlreadyRebalancingException e) {
                logger.info("Node " + rebalanceSubTask.getStealerId() + " is currently rebalancing will wait till it finish.");
                this.adminClient.waitForCompletion(rebalanceSubTask.getStealerId(), "server.state", MetadataStore.VoldemortState.NORMAL_SERVER.toString(), this.rebalanceConfig.getRebalancingClientTimeoutSeconds(), TimeUnit.SECONDS);
                exception = e;
                continue;
            }
        }
        throw new VoldemortException("Failed to start rebalancing at node " + rebalanceSubTask.getStealerId() + " with rebalanceInfo:" + rebalanceSubTask, exception);
    }

    private void executorShutDown(ExecutorService executorService) {
        try {
            executorService.shutdown();
            executorService.awaitTermination(this.rebalanceConfig.getRebalancingClientTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            logger.warn("Error while stoping executor service.", e);
        }
    }

    public AdminClient getAdminClient() {
        return this.adminClient;
    }

    public void stop() {
        this.adminClient.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void commitClusterChanges(Node stealerNode, RebalancePartitionsInfo rebalanceStealInfo) throws Exception {
        AdminClient adminClient = this.adminClient;
        synchronized (adminClient) {
            Cluster currentCluster = this.adminClient.getAdminClientCluster();
            Node donorNode = currentCluster.getNodeById(rebalanceStealInfo.getDonorId());
            Versioned<Cluster> latestCluster = RebalanceUtils.getLatestCluster(Arrays.asList(donorNode.getId(), rebalanceStealInfo.getStealerId()), this.adminClient);
            VectorClock latestClock = (VectorClock)latestCluster.getVersion();
            Cluster updatedCluster = RebalanceUtils.createUpdatedCluster(currentCluster, stealerNode, donorNode, rebalanceStealInfo.getStealMasterPartitions());
            latestClock.incrementVersion(stealerNode.getId(), System.currentTimeMillis());
            try {
                RebalanceUtils.propagateCluster(this.adminClient, updatedCluster, latestClock, Arrays.asList(stealerNode.getId(), rebalanceStealInfo.getDonorId()));
                this.adminClient.setAdminClientCluster(updatedCluster);
            }
            catch (Exception e) {
                updatedCluster = currentCluster;
                latestClock.incrementVersion(stealerNode.getId(), System.currentTimeMillis());
                RebalanceUtils.propagateCluster(this.adminClient, updatedCluster, latestClock, new ArrayList<Integer>());
                throw e;
            }
            this.adminClient.setAdminClientCluster(updatedCluster);
        }
    }

    private Cluster getClusterWithNewNodes(Cluster currentCluster, Cluster targetCluster) {
        ArrayList<Node> newNodes = new ArrayList<Node>();
        for (Node node : targetCluster.getNodes()) {
            if (RebalanceUtils.containsNode(currentCluster, node.getId())) continue;
            newNodes.add(RebalanceUtils.updateNode(node, new ArrayList<Integer>()));
        }
        return RebalanceUtils.updateCluster(currentCluster, newNodes);
    }
}

