/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.rebalance;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.admin.AsyncOperation;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.RebalanceUtils;

class RebalanceAsyncOperation
extends AsyncOperation {
    private static final Logger logger = Logger.getLogger(RebalanceAsyncOperation.class);
    private List<Integer> rebalanceStatusList;
    private AdminClient adminClient;
    private final ExecutorService executors;
    private final RebalancePartitionsInfo stealInfo;
    private final int maxParallelStoresRebalancing;
    private final VoldemortConfig voldemortConfig;
    private final MetadataStore metadataStore;
    private Rebalancer rebalancer;

    protected ExecutorService createExecutors(int numThreads) {
        return Executors.newFixedThreadPool(numThreads, new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(r.getClass().getName());
                return thread;
            }
        });
    }

    public RebalanceAsyncOperation(Rebalancer rebalancer, VoldemortConfig voldemortConfig, MetadataStore metadataStore, int requestId, RebalancePartitionsInfo stealInfo, int maxParallelStoresRebalancing) {
        super(requestId, "Rebalance Operation:" + stealInfo.toString());
        this.rebalancer = rebalancer;
        this.voldemortConfig = voldemortConfig;
        this.metadataStore = metadataStore;
        this.stealInfo = stealInfo;
        this.maxParallelStoresRebalancing = maxParallelStoresRebalancing;
        this.rebalanceStatusList = new ArrayList<Integer>();
        this.adminClient = null;
        this.executors = this.createExecutors(maxParallelStoresRebalancing);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void operate() throws Exception {
        block5: {
            this.adminClient = RebalanceUtils.createTempAdminClient(this.voldemortConfig, this.metadataStore.getCluster(), this.maxParallelStoresRebalancing * 4, this.maxParallelStoresRebalancing * 2);
            final ArrayList<Exception> failures = new ArrayList<Exception>();
            try {
                logger.info("starting rebalancing task" + this.stealInfo);
                for (final String storeName : ImmutableList.copyOf(this.stealInfo.getUnbalancedStoreList())) {
                    this.executors.submit(new Runnable(){

                        public void run() {
                            try {
                                RebalanceAsyncOperation.this.rebalanceStore(storeName, RebalanceAsyncOperation.this.adminClient, RebalanceAsyncOperation.this.stealInfo);
                                ArrayList<String> tempUnbalancedStoreList = new ArrayList<String>(RebalanceAsyncOperation.this.stealInfo.getUnbalancedStoreList());
                                tempUnbalancedStoreList.remove(storeName);
                                RebalanceAsyncOperation.this.stealInfo.setUnbalancedStoreList(tempUnbalancedStoreList);
                                RebalanceAsyncOperation.this.rebalancer.setRebalancingState(RebalanceAsyncOperation.this.stealInfo);
                            }
                            catch (Exception e) {
                                logger.error("rebalanceSubTask:" + RebalanceAsyncOperation.this.stealInfo + " failed for store:" + storeName, e);
                                failures.add(e);
                            }
                        }
                    });
                }
                this.waitForShutdown();
                if (this.stealInfo.getUnbalancedStoreList().isEmpty()) {
                    logger.info("Rebalancer: rebalance " + this.stealInfo + " completed successfully.");
                    this.metadataStore.cleanRebalancingState(this.stealInfo);
                    break block5;
                }
                throw new VoldemortRebalancingException("Failed to rebalance task " + this.stealInfo, failures);
            }
            finally {
                this.rebalancer.releaseRebalancingPermit(this.stealInfo.getDonorId());
                this.adminClient.stop();
                this.adminClient = null;
            }
        }
    }

    private void waitForShutdown() {
        try {
            this.executors.shutdown();
            this.executors.awaitTermination(this.voldemortConfig.getAdminSocketTimeout(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted while awaiting termination for executors.", e);
        }
    }

    public void stop() {
        this.updateStatus("stop() called on rebalance operation !!");
        if (null != this.adminClient) {
            for (int asyncID : this.rebalanceStatusList) {
                this.adminClient.stopAsyncRequest(this.metadataStore.getNodeId(), asyncID);
            }
        }
        this.executors.shutdownNow();
    }

    private void rebalanceStore(String storeName, AdminClient adminClient, RebalancePartitionsInfo stealInfo) throws Exception {
        logger.info("starting partitions migration for store:" + storeName);
        int asyncId = adminClient.migratePartitions(stealInfo.getDonorId(), this.metadataStore.getNodeId(), storeName, stealInfo.getPartitionList(), null);
        this.rebalanceStatusList.add(asyncId);
        adminClient.waitForCompletion(this.metadataStore.getNodeId(), asyncId, this.voldemortConfig.getAdminSocketTimeout(), TimeUnit.SECONDS);
        this.rebalanceStatusList.remove((Object)asyncId);
        if (stealInfo.getDeletePartitionsList().size() > 0) {
            adminClient.deletePartitions(stealInfo.getDonorId(), storeName, stealInfo.getDeletePartitionsList(), null);
            logger.debug("Deleted partitions " + stealInfo.getDeletePartitionsList() + " from donorNode:" + stealInfo.getDonorId() + " for store " + storeName);
        }
        logger.info("partitions migration for store:" + storeName + " completed.");
    }
}

