/*
 * Decompiled with CFR 0.152.
 */
package voldemort.store.rebalancing;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.server.RequestRoutingType;
import voldemort.server.StoreRepository;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.DelegatingStore;
import voldemort.store.Store;
import voldemort.store.StoreUtils;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.rebalancing.ProxyUnreachableException;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.store.socket.SocketStore;
import voldemort.utils.ByteArray;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class RedirectingStore
extends DelegatingStore<ByteArray, byte[]> {
    private static final Logger logger = Logger.getLogger(RedirectingStore.class);
    private final MetadataStore metadata;
    private final StoreRepository storeRepository;
    private final SocketPool socketPool;
    private FailureDetector failureDetector;

    public RedirectingStore(Store<ByteArray, byte[]> innerStore, MetadataStore metadata, StoreRepository storeRepository, FailureDetector detector, SocketPool socketPool) {
        super(innerStore);
        this.metadata = metadata;
        this.storeRepository = storeRepository;
        this.socketPool = socketPool;
        this.failureDetector = detector;
    }

    @Override
    public void put(ByteArray key, Versioned<byte[]> value) throws VoldemortException {
        RebalancePartitionsInfo stealInfo = this.redirectingKey(key);
        if (stealInfo != null) {
            this.proxyGetAndLocalPut(key, stealInfo.getDonorId());
        }
        this.getInnerStore().put(key, value);
    }

    private RebalancePartitionsInfo redirectingKey(ByteArray key) {
        if (MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER.equals((Object)this.metadata.getServerState()) && !this.getName().equals("metadata")) {
            List<Integer> partitionIds = this.metadata.getRoutingStrategy(this.getName()).getPartitionList(key.get());
            return this.getRebalancePartitionsInfo(partitionIds);
        }
        return null;
    }

    @Override
    public List<Versioned<byte[]>> get(ByteArray key) throws VoldemortException {
        RebalancePartitionsInfo stealInfo = this.redirectingKey(key);
        if (stealInfo != null) {
            this.proxyGetAndLocalPut(key, stealInfo.getDonorId());
        }
        return this.getInnerStore().get(key);
    }

    @Override
    public List<Version> getVersions(ByteArray key) {
        RebalancePartitionsInfo stealInfo = this.redirectingKey(key);
        if (stealInfo != null) {
            this.proxyGetAndLocalPut(key, stealInfo.getDonorId());
        }
        return this.getInnerStore().getVersions(key);
    }

    @Override
    public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys) throws VoldemortException {
        int maxLength = Iterables.size(keys);
        ArrayList<ByteArray> redirectingKeys = Lists.newArrayListWithExpectedSize(maxLength);
        ArrayList<RebalancePartitionsInfo> rebalancePartitionsInfos = Lists.newArrayListWithExpectedSize(maxLength);
        for (ByteArray key : keys) {
            RebalancePartitionsInfo info = this.redirectingKey(key);
            if (info == null) continue;
            redirectingKeys.add(key);
            rebalancePartitionsInfos.add(info);
        }
        if (!redirectingKeys.isEmpty()) {
            this.proxyGetAllAndLocalPut(redirectingKeys, rebalancePartitionsInfos);
        }
        return this.getInnerStore().getAll(keys);
    }

    @Override
    public boolean delete(ByteArray key, Version version) throws VoldemortException {
        StoreUtils.assertValidKey(key);
        return this.getInnerStore().delete(key, version);
    }

    private RebalancePartitionsInfo getRebalancePartitionsInfo(List<Integer> partitionIds) {
        RebalancerState rebalancerState = this.metadata.getRebalancerState();
        return rebalancerState.find(this.getName(), partitionIds);
    }

    private List<Versioned<byte[]>> proxyGet(ByteArray key, int donorNodeId) {
        Node donorNode = this.metadata.getCluster().getNodeById(donorNodeId);
        this.checkNodeAvailable(donorNode);
        long startNs = System.nanoTime();
        try {
            Store<ByteArray, byte[]> redirectingStore = this.getRedirectingSocketStore(this.getName(), donorNodeId);
            List<Versioned<byte[]>> values = redirectingStore.get(key);
            this.recordSuccess(donorNode, startNs);
            return values;
        }
        catch (UnreachableStoreException e) {
            this.recordException(donorNode, startNs, e);
            throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e);
        }
    }

    private void checkNodeAvailable(Node donorNode) {
        if (!this.failureDetector.isAvailable(donorNode)) {
            throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode + " is marked down by failure detector.");
        }
    }

    private Map<ByteArray, List<Versioned<byte[]>>> proxyGetAll(List<ByteArray> keys, List<RebalancePartitionsInfo> stealInfoList) throws VoldemortException {
        HashMultimap<Integer, ByteArray> scatterMap = HashMultimap.create(stealInfoList.size(), keys.size());
        int numKeys = 0;
        for (ByteArray key : keys) {
            ++numKeys;
            for (RebalancePartitionsInfo stealInfo : stealInfoList) {
                byte[] keyBytes = key.get();
                for (int p : this.metadata.getRoutingStrategy(this.getName()).getPartitionList(keyBytes)) {
                    if (!stealInfo.getPartitionList().contains(p)) continue;
                    scatterMap.put(stealInfo.getDonorId(), key);
                }
            }
        }
        HashMap<ByteArray, List<Versioned<byte[]>>> gatherMap = Maps.newHashMapWithExpectedSize(numKeys);
        Iterator i$ = scatterMap.keySet().iterator();
        while (i$.hasNext()) {
            int donorNodeId = (Integer)i$.next();
            Node donorNode = this.metadata.getCluster().getNodeById(donorNodeId);
            this.checkNodeAvailable(donorNode);
            long startNs = System.nanoTime();
            try {
                Map<ByteArray, List<Versioned<byte[]>>> resultsForNode = this.getRedirectingSocketStore(this.getName(), donorNodeId).getAll(scatterMap.get(donorNodeId));
                this.recordSuccess(donorNode, startNs);
                for (Map.Entry<ByteArray, List<Versioned<byte[]>>> entry : resultsForNode.entrySet()) {
                    gatherMap.put(entry.getKey(), entry.getValue());
                }
            }
            catch (UnreachableStoreException e) {
                this.recordException(donorNode, startNs, e);
                throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e);
            }
        }
        return gatherMap;
    }

    private void proxyGetAndLocalPut(ByteArray key, int donorId) throws VoldemortException {
        List<Versioned<byte[]>> proxyValues = this.proxyGet(key, donorId);
        for (Versioned<byte[]> proxyValue : proxyValues) {
            try {
                this.getInnerStore().put(key, proxyValue);
            }
            catch (ObsoleteVersionException e) {}
        }
    }

    private void proxyGetAllAndLocalPut(List<ByteArray> keys, List<RebalancePartitionsInfo> stealInfoList) throws VoldemortException {
        Map<ByteArray, List<Versioned<byte[]>>> proxyKeyValues = this.proxyGetAll(keys, stealInfoList);
        for (Map.Entry<ByteArray, List<Versioned<byte[]>>> keyValuePair : proxyKeyValues.entrySet()) {
            for (Versioned<byte[]> proxyValue : keyValuePair.getValue()) {
                try {
                    this.getInnerStore().put(keyValuePair.getKey(), proxyValue);
                }
                catch (ObsoleteVersionException e) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Store<ByteArray, byte[]> getRedirectingSocketStore(String storeName, int donorNodeId) {
        if (!this.storeRepository.hasRedirectingSocketStore(storeName, donorNodeId)) {
            StoreRepository storeRepository = this.storeRepository;
            synchronized (storeRepository) {
                if (!this.storeRepository.hasRedirectingSocketStore(storeName, donorNodeId)) {
                    Node donorNode = this.getNodeIfPresent(donorNodeId);
                    logger.info("Creating redirectingSocketStore for donorNode " + donorNode + " store " + storeName);
                    this.storeRepository.addRedirectingSocketStore(donorNode.getId(), new SocketStore(storeName, new SocketDestination(donorNode.getHost(), donorNode.getSocketPort(), RequestFormatType.PROTOCOL_BUFFERS), this.socketPool, RequestRoutingType.IGNORE_CHECKS));
                }
            }
        }
        return this.storeRepository.getRedirectingSocketStore(storeName, donorNodeId);
    }

    private Node getNodeIfPresent(int donorId) {
        try {
            return this.metadata.getCluster().getNodeById(donorId);
        }
        catch (Exception e) {
            throw new VoldemortException("Failed to get donorNode " + donorId + " from current cluster " + this.metadata.getCluster() + " at node " + this.metadata.getNodeId(), e);
        }
    }

    private void recordException(Node node, long startNs, UnreachableStoreException e) {
        this.failureDetector.recordException(node, (System.nanoTime() - startNs) / 1000000L, e);
    }

    private void recordSuccess(Node node, long startNs) {
        this.failureDetector.recordSuccess(node, (System.nanoTime() - startNs) / 1000000L);
    }
}

