/*
 * Decompiled with CFR 0.152.
 */
package voldemort.client.protocol.admin;

import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.client.protocol.pb.VProto;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.socket.SocketAndStreams;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AdminClient {
    private static final Logger logger = Logger.getLogger(AdminClient.class);
    private final ErrorCodeMapper errorMapper;
    private final SocketPool pool;
    private final NetworkClassLoader networkClassLoader;
    private static final ClusterMapper clusterMapper = new ClusterMapper();
    private static final StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();
    private static final long INITIAL_DELAY = 250L;
    private static final long MAX_DELAY = 60000L;
    private final AdminClientConfig adminClientConfig;
    private Cluster currentCluster;

    public AdminClient(String bootstrapURL, AdminClientConfig adminClientConfig) {
        this.currentCluster = this.getClusterFromBootstrapURL(bootstrapURL);
        this.errorMapper = new ErrorCodeMapper();
        this.pool = this.createSocketPool(adminClientConfig);
        this.networkClassLoader = new NetworkClassLoader(Thread.currentThread().getContextClassLoader());
        this.adminClientConfig = adminClientConfig;
    }

    public AdminClient(Cluster cluster, AdminClientConfig adminClientConfig) {
        this.currentCluster = cluster;
        this.errorMapper = new ErrorCodeMapper();
        this.pool = this.createSocketPool(adminClientConfig);
        this.networkClassLoader = new NetworkClassLoader(Thread.currentThread().getContextClassLoader());
        this.adminClientConfig = adminClientConfig;
    }

    private Cluster getClusterFromBootstrapURL(String bootstrapURL) {
        ClientConfig config = new ClientConfig();
        config.setBootstrapUrls(bootstrapURL);
        SocketStoreClientFactory factory = new SocketStoreClientFactory(config);
        String clusterXml = factory.bootstrapMetadataWithRetries("cluster.xml", factory.validateUrls(config.getBootstrapUrls()));
        factory.close();
        return clusterMapper.readCluster(new StringReader(clusterXml));
    }

    private SocketPool createSocketPool(AdminClientConfig config) {
        TimeUnit unit = TimeUnit.SECONDS;
        return new SocketPool(config.getMaxConnectionsPerNode(), (int)unit.toMillis(config.getAdminConnectionTimeoutSec()), (int)unit.toMillis(config.getAdminSocketTimeoutSec()), config.getAdminSocketBufferSize(), config.getAdminSocketKeepAlive());
    }

    private <T extends Message.Builder> T sendAndReceive(int nodeId, Message message, T builder) {
        Node node = this.getAdminClientCluster().getNodeById(nodeId);
        SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
        SocketAndStreams sands = this.pool.checkout(destination);
        try {
            DataOutputStream outputStream = sands.getOutputStream();
            DataInputStream inputStream = sands.getInputStream();
            ProtoUtils.writeMessage(outputStream, message);
            outputStream.flush();
            T t = ProtoUtils.readToBuilder(inputStream, builder);
            return t;
        }
        catch (IOException e) {
            this.close(sands.getSocket());
            throw new VoldemortException(e);
        }
        finally {
            this.pool.checkin(destination, sands);
        }
    }

    public void updateEntries(int nodeId, String storeName, Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator, VoldemortFilter filter) {
        Node node = this.getAdminClientCluster().getNodeById(nodeId);
        SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
        SocketAndStreams sands = this.pool.checkout(destination);
        DataOutputStream outputStream = sands.getOutputStream();
        DataInputStream inputStream = sands.getInputStream();
        boolean firstMessage = true;
        try {
            if (entryIterator.hasNext()) {
                while (entryIterator.hasNext()) {
                    Pair<ByteArray, Versioned<byte[]>> entry = entryIterator.next();
                    VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder().setKey(ProtoUtils.encodeBytes(entry.getFirst())).setVersioned(ProtoUtils.encodeVersioned(entry.getSecond())).build();
                    VAdminProto.UpdatePartitionEntriesRequest.Builder updateRequest = VAdminProto.UpdatePartitionEntriesRequest.newBuilder().setStore(storeName).setPartitionEntry(partitionEntry);
                    if (firstMessage) {
                        if (filter != null) {
                            updateRequest.setFilter(this.encodeFilter(filter));
                        }
                        ProtoUtils.writeMessage(outputStream, (Message)VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.UPDATE_PARTITION_ENTRIES).setUpdatePartitionEntries(updateRequest).build());
                        outputStream.flush();
                        firstMessage = false;
                        continue;
                    }
                    ProtoUtils.writeMessage(outputStream, (Message)updateRequest.build());
                }
                ProtoUtils.writeEndOfStream(outputStream);
                outputStream.flush();
                VAdminProto.UpdatePartitionEntriesResponse.Builder updateResponse = ProtoUtils.readToBuilder(inputStream, VAdminProto.UpdatePartitionEntriesResponse.newBuilder());
                if (updateResponse.hasError()) {
                    this.throwException(updateResponse.getError());
                }
            }
        }
        catch (IOException e) {
            this.close(sands.getSocket());
            throw new VoldemortException(e);
        }
        finally {
            this.pool.checkin(destination, sands);
        }
    }

    private void initiateFetchRequest(DataOutputStream outputStream, String storeName, List<Integer> partitionList, VoldemortFilter filter, boolean fetchValues, boolean fetchMasterEntries) throws IOException {
        VAdminProto.FetchPartitionEntriesRequest.Builder fetchRequest = VAdminProto.FetchPartitionEntriesRequest.newBuilder().addAllPartitions(partitionList).setFetchValues(fetchValues).setFetchMasterEntries(fetchMasterEntries).setStore(storeName);
        if (filter != null) {
            fetchRequest.setFilter(this.encodeFilter(filter));
        }
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES).setFetchPartitionEntries(fetchRequest).build();
        ProtoUtils.writeMessage(outputStream, (Message)request);
        outputStream.flush();
    }

    private VAdminProto.FetchPartitionEntriesResponse responseFromStream(DataInputStream inputStream, int size) throws IOException {
        byte[] input = new byte[size];
        ByteUtils.read(inputStream, input);
        VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
        response.mergeFrom(input);
        return response.build();
    }

    public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId, String storeName, List<Integer> partitionList, VoldemortFilter filter, boolean fetchMasterEntries) {
        Node node = this.getAdminClientCluster().getNodeById(nodeId);
        final SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
        final SocketAndStreams sands = this.pool.checkout(destination);
        DataOutputStream outputStream = sands.getOutputStream();
        final DataInputStream inputStream = sands.getInputStream();
        try {
            this.initiateFetchRequest(outputStream, storeName, partitionList, filter, true, fetchMasterEntries);
        }
        catch (IOException e) {
            this.close(sands.getSocket());
            this.pool.checkin(destination, sands);
            throw new VoldemortException(e);
        }
        return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>>(){

            @Override
            public Pair<ByteArray, Versioned<byte[]>> computeNext() {
                try {
                    int size = inputStream.readInt();
                    if (size == -1) {
                        AdminClient.this.pool.checkin(destination, sands);
                        return (Pair)this.endOfData();
                    }
                    VAdminProto.FetchPartitionEntriesResponse response = AdminClient.this.responseFromStream(inputStream, size);
                    if (response.hasError()) {
                        AdminClient.this.pool.checkin(destination, sands);
                        AdminClient.this.throwException(response.getError());
                    }
                    VAdminProto.PartitionEntry partitionEntry = response.getPartitionEntry();
                    return Pair.create(ProtoUtils.decodeBytes(partitionEntry.getKey()), ProtoUtils.decodeVersioned(partitionEntry.getVersioned()));
                }
                catch (IOException e) {
                    AdminClient.this.close(sands.getSocket());
                    AdminClient.this.pool.checkin(destination, sands);
                    throw new VoldemortException(e);
                }
            }
        };
    }

    public Iterator<ByteArray> fetchKeys(int nodeId, String storeName, List<Integer> partitionList, VoldemortFilter filter, boolean fetchMasterEntries) {
        Node node = this.getAdminClientCluster().getNodeById(nodeId);
        final SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
        final SocketAndStreams sands = this.pool.checkout(destination);
        DataOutputStream outputStream = sands.getOutputStream();
        final DataInputStream inputStream = sands.getInputStream();
        try {
            this.initiateFetchRequest(outputStream, storeName, partitionList, filter, false, fetchMasterEntries);
        }
        catch (IOException e) {
            this.close(sands.getSocket());
            this.pool.checkin(destination, sands);
            throw new VoldemortException(e);
        }
        return new AbstractIterator<ByteArray>(){

            @Override
            public ByteArray computeNext() {
                try {
                    int size = inputStream.readInt();
                    if (size == -1) {
                        AdminClient.this.pool.checkin(destination, sands);
                        return (ByteArray)this.endOfData();
                    }
                    VAdminProto.FetchPartitionEntriesResponse response = AdminClient.this.responseFromStream(inputStream, size);
                    if (response.hasError()) {
                        AdminClient.this.pool.checkin(destination, sands);
                        AdminClient.this.throwException(response.getError());
                    }
                    return ProtoUtils.decodeBytes(response.getKey());
                }
                catch (IOException e) {
                    AdminClient.this.close(sands.getSocket());
                    AdminClient.this.pool.checkin(destination, sands);
                    throw new VoldemortException(e);
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreDataFromReplications(int nodeId, int parallelTransfers) {
        ExecutorService executors = Executors.newFixedThreadPool(parallelTransfers, new ThreadFactory(){

            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("restore-data-thread");
                return thread;
            }
        });
        try {
            List<StoreDefinition> storeDefList = this.getRemoteStoreDefList(nodeId).getValue();
            Cluster cluster = this.getRemoteCluster(nodeId).getValue();
            List<StoreDefinition> writableStores = RebalanceUtils.getWritableStores(storeDefList);
            for (StoreDefinition def : writableStores) {
                this.restoreStoreFromReplication(nodeId, cluster, def, executors);
            }
        }
        finally {
            executors.shutdown();
            try {
                executors.awaitTermination(this.adminClientConfig.getRestoreDataTimeout(), TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                logger.error("Interrupted while waiting restore operation to finish.");
            }
            logger.info("Finished restoring data.");
        }
    }

    private void restoreStoreFromReplication(final int restoringNodeId, Cluster cluster, final StoreDefinition storeDef, ExecutorService executorService) {
        logger.info("Restoring data for store:" + storeDef.getName());
        RoutingStrategyFactory factory = new RoutingStrategyFactory();
        RoutingStrategy strategy = factory.updateRoutingStrategy(storeDef, cluster);
        Map<Integer, List<Integer>> restoreMapping = this.getReplicationMapping(cluster, restoringNodeId, strategy);
        for (final Map.Entry<Integer, List<Integer>> replicationEntry : restoreMapping.entrySet()) {
            final int donorNodeId = replicationEntry.getKey();
            executorService.submit(new Runnable(){

                public void run() {
                    try {
                        logger.info("restoring data for store " + storeDef.getName() + " at node " + restoringNodeId + " from node " + replicationEntry.getKey() + " partitions:" + replicationEntry.getValue());
                        int migrateAsyncId = AdminClient.this.migratePartitions(donorNodeId, restoringNodeId, storeDef.getName(), (List)replicationEntry.getValue(), null);
                        AdminClient.this.waitForCompletion(restoringNodeId, migrateAsyncId, AdminClient.this.adminClientConfig.getRestoreDataTimeout(), TimeUnit.SECONDS);
                        logger.info("restoring data for store:" + storeDef.getName() + " from node " + donorNodeId + " completed.");
                    }
                    catch (Exception e) {
                        logger.error("restore operation for store " + storeDef.getName() + "from node " + donorNodeId + " failed.", e);
                    }
                }
            });
        }
    }

    private Map<Integer, List<Integer>> getReplicationMapping(Cluster cluster, int nodeId, RoutingStrategy strategy) {
        Map<Integer, Integer> partitionsToNodeMapping = RebalanceUtils.getCurrentPartitionMapping(cluster);
        HashMap<Integer, List<Integer>> restoreMapping = new HashMap<Integer, List<Integer>>();
        for (int partition : this.getNodePartitions(cluster, nodeId, strategy)) {
            List<Integer> replicationPartitionsList = strategy.getReplicatingPartitionList(partition);
            if (replicationPartitionsList.size() <= 1) continue;
            int index = 0;
            int replicatingPartition = replicationPartitionsList.get(index++);
            while (partitionsToNodeMapping.get(replicatingPartition) == nodeId) {
                replicatingPartition = replicationPartitionsList.get(index++);
            }
            int replicatingNode = partitionsToNodeMapping.get(replicatingPartition);
            if (!restoreMapping.containsKey(replicatingNode)) {
                restoreMapping.put(replicatingNode, new ArrayList());
            }
            if (restoreMapping.get(replicatingNode).contains(replicatingPartition)) continue;
            restoreMapping.get(replicatingNode).add(replicatingPartition);
        }
        return restoreMapping;
    }

    private List<Integer> getNodePartitions(Cluster cluster, int nodeId, RoutingStrategy strategy) {
        ArrayList<Integer> partitionsList = new ArrayList<Integer>(cluster.getNodeById(nodeId).getPartitionIds());
        Map<Integer, Integer> partitionsToNodeMapping = RebalanceUtils.getCurrentPartitionMapping(cluster);
        for (Node node : cluster.getNodes()) {
            if (node.getId() == nodeId) continue;
            for (int partition : node.getPartitionIds()) {
                List<Integer> replicatedPartitions = strategy.getReplicatingPartitionList(partition);
                for (int replicationPartition : replicatedPartitions) {
                    if (partitionsToNodeMapping.get(replicationPartition) != nodeId) continue;
                    partitionsList.add(partition);
                }
            }
        }
        return partitionsList;
    }

    public int rebalanceNode(RebalancePartitionsInfo stealInfo) {
        VAdminProto.InitiateRebalanceNodeRequest rebalanceNodeRequest = VAdminProto.InitiateRebalanceNodeRequest.newBuilder().setAttempt(stealInfo.getAttempt()).setDonorId(stealInfo.getDonorId()).setStealerId(stealInfo.getStealerId()).addAllPartitions(stealInfo.getPartitionList()).addAllUnbalancedStore(stealInfo.getUnbalancedStoreList()).addAllDeletePartitions(stealInfo.getDeletePartitionsList()).build();
        VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.INITIATE_REBALANCE_NODE).setInitiateRebalanceNode(rebalanceNodeRequest).build();
        VAdminProto.AsyncOperationStatusResponse.Builder response = this.sendAndReceive(stealInfo.getStealerId(), (Message)adminRequest, VAdminProto.AsyncOperationStatusResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        return response.getRequestId();
    }

    public int migratePartitions(int donorNodeId, int stealerNodeId, String storeName, List<Integer> stealPartitionList, VoldemortFilter filter) {
        VAdminProto.InitiateFetchAndUpdateRequest.Builder initiateFetchAndUpdateRequest = VAdminProto.InitiateFetchAndUpdateRequest.newBuilder().setNodeId(donorNodeId).addAllPartitions(stealPartitionList).setStore(storeName);
        try {
            if (filter != null) {
                initiateFetchAndUpdateRequest.setFilter(this.encodeFilter(filter));
            }
        }
        catch (IOException e) {
            throw new VoldemortException(e);
        }
        VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder().setInitiateFetchAndUpdate(initiateFetchAndUpdateRequest).setType(VAdminProto.AdminRequestType.INITIATE_FETCH_AND_UPDATE).build();
        VAdminProto.AsyncOperationStatusResponse.Builder response = this.sendAndReceive(stealerNodeId, (Message)adminRequest, VAdminProto.AsyncOperationStatusResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        return response.getRequestId();
    }

    public void truncate(int nodeId, String storeName) {
        VAdminProto.TruncateEntriesRequest.Builder truncateRequest = VAdminProto.TruncateEntriesRequest.newBuilder().setStore(storeName);
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.TRUNCATE_ENTRIES).setTruncateEntries(truncateRequest).build();
        VAdminProto.TruncateEntriesResponse.Builder response = this.sendAndReceive(nodeId, (Message)request, VAdminProto.TruncateEntriesResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
    }

    public AsyncOperationStatus getAsyncRequestStatus(int nodeId, int requestId) {
        VAdminProto.AsyncOperationStatusRequest asyncRequest = VAdminProto.AsyncOperationStatusRequest.newBuilder().setRequestId(requestId).build();
        VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.ASYNC_OPERATION_STATUS).setAsyncOperationStatus(asyncRequest).build();
        VAdminProto.AsyncOperationStatusResponse.Builder response = this.sendAndReceive(nodeId, (Message)adminRequest, VAdminProto.AsyncOperationStatusResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        AsyncOperationStatus status = new AsyncOperationStatus(response.getRequestId(), response.getDescription());
        status.setStatus(response.getStatus());
        status.setComplete(response.getComplete());
        return status;
    }

    public List<Integer> getAsyncRequestList(int nodeId) {
        return this.getAsyncRequestList(nodeId, false);
    }

    public List<Integer> getAsyncRequestList(int nodeId, boolean showComplete) {
        VAdminProto.AsyncOperationListRequest asyncOperationListRequest = VAdminProto.AsyncOperationListRequest.newBuilder().setShowComplete(showComplete).build();
        VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.ASYNC_OPERATION_LIST).setAsyncOperationList(asyncOperationListRequest).build();
        VAdminProto.AsyncOperationListResponse.Builder response = this.sendAndReceive(nodeId, (Message)adminRequest, VAdminProto.AsyncOperationListResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        return response.getRequestIdsList();
    }

    public void stopAsyncRequest(int nodeId, int requestId) {
        VAdminProto.AsyncOperationStopRequest asyncOperationStopRequest = VAdminProto.AsyncOperationStopRequest.newBuilder().setRequestId(requestId).build();
        VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.ASYNC_OPERATION_STOP).setAsyncOperationStop(asyncOperationStopRequest).build();
        VAdminProto.AsyncOperationStopResponse.Builder response = this.sendAndReceive(nodeId, (Message)adminRequest, VAdminProto.AsyncOperationStopResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
    }

    private VAdminProto.VoldemortFilter encodeFilter(VoldemortFilter filter) throws IOException {
        Class<?> cl = filter.getClass();
        byte[] classBytes = this.networkClassLoader.dumpClass(cl);
        return VAdminProto.VoldemortFilter.newBuilder().setName(cl.getName()).setData(ProtoUtils.encodeBytes(new ByteArray(classBytes))).build();
    }

    public int deletePartitions(int nodeId, String storeName, List<Integer> partitionList, VoldemortFilter filter) {
        VAdminProto.DeletePartitionEntriesRequest.Builder deleteRequest = VAdminProto.DeletePartitionEntriesRequest.newBuilder().addAllPartitions(partitionList).setStore(storeName);
        try {
            if (filter != null) {
                deleteRequest.setFilter(this.encodeFilter(filter));
            }
        }
        catch (IOException e) {
            throw new VoldemortException(e);
        }
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.DELETE_PARTITION_ENTRIES).setDeletePartitionEntries(deleteRequest).build();
        VAdminProto.DeletePartitionEntriesResponse.Builder response = this.sendAndReceive(nodeId, (Message)request, VAdminProto.DeletePartitionEntriesResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        return response.getCount();
    }

    public void throwException(VProto.Error error) {
        throw this.errorMapper.getError((short)error.getErrorCode(), error.getErrorMessage());
    }

    private void close(Socket socket) {
        try {
            socket.close();
        }
        catch (IOException e) {
            logger.warn("Failed to close socket");
        }
    }

    public void stop() {
        this.pool.close();
    }

    public void waitForCompletion(int nodeId, int requestId, long maxWait, TimeUnit timeUnit) {
        long delay = 250L;
        long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);
        String description = null;
        while (System.currentTimeMillis() < waitUntil) {
            try {
                AsyncOperationStatus status = this.getAsyncRequestStatus(nodeId, requestId);
                logger.debug("Status for async task " + requestId + " at node " + nodeId + " is " + status);
                description = status.getDescription();
                if (status.hasException()) {
                    throw status.getException();
                }
                if (status.isComplete()) {
                    return;
                }
                if (delay < 60000L) {
                    delay <<= 1;
                }
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            catch (Exception e) {
                throw new VoldemortException("Failed while waiting for async task " + description + " at node " + nodeId + " to finish", e);
            }
        }
        throw new VoldemortException("Failed to finish task requestId:" + requestId + " in maxWait" + maxWait + " " + timeUnit.toString());
    }

    public void waitForCompletion(int nodeId, String key, String value, long maxWait, TimeUnit timeUnit) {
        long delay = 250L;
        long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);
        while (System.currentTimeMillis() < waitUntil) {
            String currentValue = this.getRemoteMetadata(nodeId, key).getValue();
            if (value.equals(currentValue)) {
                return;
            }
            logger.debug("waiting for value " + value + " for metadata key " + key + " from remote node " + nodeId + " currentValue " + currentValue);
            if (delay < 60000L) {
                delay <<= 1;
            }
            try {
                Thread.sleep(delay);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        throw new VoldemortException("Failed to get matching value " + value + " for key " + key + " at remote node " + nodeId + " in maximum wait" + maxWait + " " + timeUnit.toString() + " time.");
    }

    public void updateRemoteMetadata(int remoteNodeId, String key, Versioned<String> value) {
        ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8"));
        Versioned<byte[]> valueBytes = new Versioned<byte[]>(ByteUtils.getBytes(value.getValue(), "UTF-8"), value.getVersion());
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.UPDATE_METADATA).setUpdateMetadata(VAdminProto.UpdateMetadataRequest.newBuilder().setKey(ByteString.copyFrom((byte[])keyBytes.get())).setVersioned(ProtoUtils.encodeVersioned(valueBytes)).build()).build();
        VAdminProto.UpdateMetadataResponse.Builder response = this.sendAndReceive(remoteNodeId, (Message)request, VAdminProto.UpdateMetadataResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
    }

    public Versioned<String> getRemoteMetadata(int remoteNodeId, String key) {
        ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8"));
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.GET_METADATA).setGetMetadata(VAdminProto.GetMetadataRequest.newBuilder().setKey(ByteString.copyFrom((byte[])keyBytes.get()))).build();
        VAdminProto.GetMetadataResponse.Builder response = this.sendAndReceive(remoteNodeId, (Message)request, VAdminProto.GetMetadataResponse.newBuilder());
        if (response.hasError()) {
            this.throwException(response.getError());
        }
        Versioned<byte[]> value = ProtoUtils.decodeVersioned(response.getVersion());
        return new Versioned<String>(ByteUtils.getString(value.getValue(), "UTF-8"), value.getVersion());
    }

    public void updateRemoteCluster(int nodeId, Cluster cluster, Version clock) throws VoldemortException {
        this.updateRemoteMetadata(nodeId, "cluster.xml", new Versioned<String>(clusterMapper.writeCluster(cluster), clock));
    }

    public Versioned<Cluster> getRemoteCluster(int nodeId) throws VoldemortException {
        Versioned<String> value = this.getRemoteMetadata(nodeId, "cluster.xml");
        Cluster cluster = clusterMapper.readCluster(new StringReader(value.getValue()));
        return new Versioned<Cluster>(cluster, value.getVersion());
    }

    public void updateRemoteStoreDefList(int nodeId, List<StoreDefinition> storesList) throws VoldemortException {
        VectorClock oldClock = (VectorClock)this.getRemoteStoreDefList(nodeId).getVersion();
        this.updateRemoteMetadata(nodeId, "stores.xml", new Versioned<String>(storeMapper.writeStoreList(storesList), oldClock.incremented(nodeId, 1L)));
    }

    public Versioned<List<StoreDefinition>> getRemoteStoreDefList(int nodeId) throws VoldemortException {
        Versioned<String> value = this.getRemoteMetadata(nodeId, "stores.xml");
        List<StoreDefinition> storeList = storeMapper.readStoreList(new StringReader(value.getValue()));
        return new Versioned<List<StoreDefinition>>(storeList, value.getVersion());
    }

    public void updateRemoteServerState(int nodeId, MetadataStore.VoldemortState state, Version clock) {
        this.updateRemoteMetadata(nodeId, "server.state", new Versioned<String>(state.toString(), clock));
    }

    public Versioned<MetadataStore.VoldemortState> getRemoteServerState(int nodeId) {
        Versioned<String> value = this.getRemoteMetadata(nodeId, "server.state");
        return new Versioned<MetadataStore.VoldemortState>(MetadataStore.VoldemortState.valueOf(value.getValue()), value.getVersion());
    }

    public void updateRemoteClusterState(int nodeId, MetadataStore.VoldemortState state, Version clock) {
        this.updateRemoteMetadata(nodeId, "cluster.state", new Versioned<String>(state.toString(), clock));
    }

    public void addStore(StoreDefinition def) {
        String value = storeMapper.writeStore(def);
        VAdminProto.AddStoreRequest.Builder addStoreRequest = VAdminProto.AddStoreRequest.newBuilder().setStoreDefinition(value);
        VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.ADD_STORE).setAddStore(addStoreRequest).build();
        for (Node node : this.currentCluster.getNodes()) {
            VAdminProto.AddStoreResponse.Builder response = this.sendAndReceive(node.getId(), (Message)request, VAdminProto.AddStoreResponse.newBuilder());
            if (!response.hasError()) continue;
            this.throwException(response.getError());
        }
    }

    public void setAdminClientCluster(Cluster cluster) {
        this.currentCluster = cluster;
    }

    public Cluster getAdminClientCluster() {
        return this.currentCluster;
    }
}

