/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.BalancingPolicy;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

@InterfaceAudience.Private
public class Balancer {
    static final Log LOG = LogFactory.getLog(Balancer.class);
    private static final long MAX_BLOCKS_SIZE_TO_FETCH = 0x80000000L;
    private static long WIN_WIDTH = 5400000L;
    public static final int MAX_NUM_CONCURRENT_MOVES = 5;
    private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
    public static final long DELAY_AFTER_ERROR = 10000L;
    public static final int BLOCK_MOVE_READ_TIMEOUT = 1200000;
    private static final String USAGE = "Usage: java " + Balancer.class.getSimpleName() + "\n\t[-policy <policy>]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " + BalancingPolicy.Pool.INSTANCE.getName() + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
    private final NameNodeConnector nnc;
    private final BalancingPolicy policy;
    private final double threshold;
    private final Collection<Source> overUtilizedDatanodes = new LinkedList<Source>();
    private final Collection<Source> aboveAvgUtilizedDatanodes = new LinkedList<Source>();
    private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes = new LinkedList<BalancerDatanode>();
    private final Collection<BalancerDatanode> underUtilizedDatanodes = new LinkedList<BalancerDatanode>();
    private final Collection<Source> sources = new HashSet<Source>();
    private final Collection<BalancerDatanode> targets = new HashSet<BalancerDatanode>();
    private final Map<Block, BalancerBlock> globalBlockList = new HashMap<Block, BalancerBlock>();
    private final MovedBlocks movedBlocks = new MovedBlocks();
    private final Map<String, BalancerDatanode> datanodeMap = new HashMap<String, BalancerDatanode>();
    private NetworkTopology cluster;
    private final ExecutorService moverExecutor;
    private final ExecutorService dispatcherExecutor;
    static final Matcher SAME_NODE_GROUP = new Matcher(){

        @Override
        public boolean match(NetworkTopology cluster, Node left, Node right) {
            return cluster.isOnSameNodeGroup(left, right);
        }
    };
    static final Matcher SAME_RACK = new Matcher(){

        @Override
        public boolean match(NetworkTopology cluster, Node left, Node right) {
            return cluster.isOnSameRack(left, right);
        }
    };
    static final Matcher ANY_OTHER = new Matcher(){

        @Override
        public boolean match(NetworkTopology cluster, Node left, Node right) {
            return left != right;
        }
    };
    private final BytesMoved bytesMoved = new BytesMoved();
    private static long blockMoveWaitTime = 30000L;

    private static void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
        if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof BlockPlacementPolicyDefault)) {
            throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
        }
    }

    Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
        this.threshold = p.threshold;
        this.policy = p.policy;
        this.nnc = theblockpool;
        this.cluster = NetworkTopology.getInstance((Configuration)conf);
        this.moverExecutor = Executors.newFixedThreadPool(conf.getInt("dfs.balancer.moverThreads", 1000));
        this.dispatcherExecutor = Executors.newFixedThreadPool(conf.getInt("dfs.balancer.dispatcherThreads", 200));
    }

    private long initNodes(DatanodeInfo[] datanodes) {
        for (DatanodeInfo datanode : datanodes) {
            if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) continue;
            this.policy.accumulateSpaces(datanode);
        }
        this.policy.initAvgUtilization();
        long overLoadedBytes = 0L;
        long underLoadedBytes = 0L;
        for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
            BalancerDatanode datanodeS;
            if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) continue;
            this.cluster.add((Node)datanode);
            double avg = this.policy.getAvgUtilization();
            if (this.policy.getUtilization(datanode) > avg) {
                datanodeS = new Source(datanode, this.policy, this.threshold);
                if (this.isAboveAvgUtilized(datanodeS)) {
                    this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
                } else {
                    assert (this.isOverUtilized(datanodeS)) : datanodeS.getDisplayName() + "is not an overUtilized node";
                    this.overUtilizedDatanodes.add((Source)datanodeS);
                    overLoadedBytes += (long)((datanodeS.utilization - avg - this.threshold) * (double)datanodeS.datanode.getCapacity() / 100.0);
                }
            } else {
                datanodeS = new BalancerDatanode(datanode, this.policy, this.threshold);
                if (this.isBelowOrEqualAvgUtilized(datanodeS)) {
                    this.belowAvgUtilizedDatanodes.add(datanodeS);
                } else {
                    assert (this.isUnderUtilized(datanodeS)) : "isUnderUtilized(" + datanodeS.getDisplayName() + ")=" + this.isUnderUtilized(datanodeS) + ", utilization=" + datanodeS.utilization;
                    this.underUtilizedDatanodes.add(datanodeS);
                    underLoadedBytes += (long)((avg - this.threshold - datanodeS.utilization) * (double)datanodeS.datanode.getCapacity() / 100.0);
                }
            }
            this.datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
        }
        this.logNodes();
        assert (this.datanodeMap.size() == this.overUtilizedDatanodes.size() + this.underUtilizedDatanodes.size() + this.aboveAvgUtilizedDatanodes.size() + this.belowAvgUtilizedDatanodes.size()) : "Mismatched number of datanodes";
        return Math.max(overLoadedBytes, underLoadedBytes);
    }

    private void logNodes() {
        Balancer.logNodes("over-utilized", this.overUtilizedDatanodes);
        if (LOG.isTraceEnabled()) {
            Balancer.logNodes("above-average", this.aboveAvgUtilizedDatanodes);
            Balancer.logNodes("below-average", this.belowAvgUtilizedDatanodes);
        }
        Balancer.logNodes("underutilized", this.underUtilizedDatanodes);
    }

    private static <T extends BalancerDatanode> void logNodes(String name, Collection<T> nodes) {
        LOG.info((Object)(nodes.size() + " " + name + ": " + nodes));
    }

    private long chooseNodes() {
        if (this.cluster.isNodeGroupAware()) {
            this.chooseNodes(SAME_NODE_GROUP);
        }
        this.chooseNodes(SAME_RACK);
        this.chooseNodes(ANY_OTHER);
        assert (this.datanodeMap.size() >= this.sources.size() + this.targets.size()) : "Mismatched number of datanodes (" + this.datanodeMap.size() + " total, " + this.sources.size() + " sources, " + this.targets.size() + " targets)";
        long bytesToMove = 0L;
        for (Source src : this.sources) {
            bytesToMove += src.getScheduledSize();
        }
        return bytesToMove;
    }

    private void chooseNodes(Matcher matcher) {
        this.chooseDatanodes(this.overUtilizedDatanodes, this.underUtilizedDatanodes, matcher);
        this.chooseDatanodes(this.overUtilizedDatanodes, this.belowAvgUtilizedDatanodes, matcher);
        this.chooseDatanodes(this.underUtilizedDatanodes, this.aboveAvgUtilizedDatanodes, matcher);
    }

    private <D extends BalancerDatanode, C extends BalancerDatanode> void chooseDatanodes(Collection<D> datanodes, Collection<C> candidates, Matcher matcher) {
        Iterator<D> i = datanodes.iterator();
        while (i.hasNext()) {
            BalancerDatanode datanode = (BalancerDatanode)i.next();
            while (this.chooseForOneDatanode(datanode, candidates, matcher)) {
            }
            if (datanode.hasSpaceForScheduling()) continue;
            i.remove();
        }
    }

    private <C extends BalancerDatanode> boolean chooseForOneDatanode(BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
        Iterator<C> i = candidates.iterator();
        C chosen = this.chooseCandidate(dn, i, matcher);
        if (chosen == null) {
            return false;
        }
        if (dn instanceof Source) {
            this.matchSourceWithTargetToMove((Source)dn, (BalancerDatanode)chosen);
        } else {
            this.matchSourceWithTargetToMove((Source)chosen, dn);
        }
        if (!((BalancerDatanode)chosen).hasSpaceForScheduling()) {
            i.remove();
        }
        return true;
    }

    private void matchSourceWithTargetToMove(Source source, BalancerDatanode target) {
        long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
        NodeTask nodeTask = new NodeTask(target, size);
        source.addNodeTask(nodeTask);
        target.incScheduledSize(nodeTask.getSize());
        this.sources.add(source);
        this.targets.add(target);
        LOG.info((Object)("Decided to move " + StringUtils.byteDesc((long)size) + " bytes from " + source.datanode.getName() + " to " + target.datanode.getName()));
    }

    private <D extends BalancerDatanode, C extends BalancerDatanode> C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
        if (dn.hasSpaceForScheduling()) {
            while (candidates.hasNext()) {
                BalancerDatanode c = (BalancerDatanode)candidates.next();
                if (!c.hasSpaceForScheduling()) {
                    candidates.remove();
                    continue;
                }
                if (!matcher.match(this.cluster, dn.getDatanode(), c.getDatanode())) continue;
                return (C)c;
            }
        }
        return null;
    }

    private long dispatchBlockMoves() throws InterruptedException {
        long bytesLastMoved = this.bytesMoved.get();
        Future[] futures = new Future[this.sources.size()];
        int i = 0;
        for (Source source : this.sources) {
            int n = i++;
            Source source2 = source;
            source2.getClass();
            futures[n] = this.dispatcherExecutor.submit(source2.new Source.BlockMoveDispatcher());
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (ExecutionException e) {
                LOG.warn((Object)"Dispatcher thread failed", e.getCause());
            }
        }
        this.waitForMoveCompletion();
        return this.bytesMoved.get() - bytesLastMoved;
    }

    static void setBlockMoveWaitTime(long time) {
        blockMoveWaitTime = time;
    }

    private void waitForMoveCompletion() {
        boolean shouldWait;
        do {
            shouldWait = false;
            for (BalancerDatanode target : this.targets) {
                if (target.isPendingQEmpty()) continue;
                shouldWait = true;
            }
            if (!shouldWait) continue;
            try {
                Thread.sleep(blockMoveWaitTime);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (shouldWait);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isGoodBlockCandidate(Source source, BalancerDatanode target, BalancerBlock block) {
        if (this.movedBlocks.contains(block)) {
            return false;
        }
        if (block.isLocatedOnDatanode(target)) {
            return false;
        }
        if (this.cluster.isNodeGroupAware() && this.isOnSameNodeGroupWithReplicas(target, block, source)) {
            return false;
        }
        boolean goodBlock = false;
        if (this.cluster.isOnSameRack((Node)source.getDatanode(), (Node)target.getDatanode())) {
            goodBlock = true;
        } else {
            boolean notOnSameRack = true;
            BalancerBlock balancerBlock = block;
            synchronized (balancerBlock) {
                for (BalancerDatanode loc : block.locations) {
                    if (!this.cluster.isOnSameRack((Node)loc.datanode, (Node)target.datanode)) continue;
                    notOnSameRack = false;
                    break;
                }
            }
            if (notOnSameRack) {
                goodBlock = true;
            } else {
                for (BalancerDatanode loc : block.locations) {
                    if (loc == source || !this.cluster.isOnSameRack((Node)loc.datanode, (Node)source.datanode)) continue;
                    goodBlock = true;
                    break;
                }
            }
        }
        return goodBlock;
    }

    private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target, BalancerBlock block, Source source) {
        for (BalancerDatanode loc : block.locations) {
            if (loc == source || !this.cluster.isOnSameNodeGroup((Node)loc.getDatanode(), (Node)target.getDatanode())) continue;
            return true;
        }
        return false;
    }

    private void resetData(Configuration conf) {
        this.cluster = NetworkTopology.getInstance((Configuration)conf);
        this.overUtilizedDatanodes.clear();
        this.aboveAvgUtilizedDatanodes.clear();
        this.belowAvgUtilizedDatanodes.clear();
        this.underUtilizedDatanodes.clear();
        this.datanodeMap.clear();
        this.sources.clear();
        this.targets.clear();
        this.policy.reset();
        this.cleanGlobalBlockList();
        this.movedBlocks.cleanup();
    }

    private void cleanGlobalBlockList() {
        Iterator<Block> globalBlockListIterator = this.globalBlockList.keySet().iterator();
        while (globalBlockListIterator.hasNext()) {
            Block block = globalBlockListIterator.next();
            if (this.movedBlocks.contains(block)) continue;
            globalBlockListIterator.remove();
        }
    }

    private boolean isOverUtilized(BalancerDatanode datanode) {
        return datanode.utilization > this.policy.getAvgUtilization() + this.threshold;
    }

    private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
        double avg = this.policy.getAvgUtilization();
        return datanode.utilization <= avg + this.threshold && datanode.utilization > avg;
    }

    private boolean isUnderUtilized(BalancerDatanode datanode) {
        return datanode.utilization < this.policy.getAvgUtilization() - this.threshold;
    }

    private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
        double avg = this.policy.getAvgUtilization();
        return datanode.utilization >= avg - this.threshold && datanode.utilization <= avg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReturnStatus run(int iteration, Formatter formatter, Configuration conf) {
        try {
            long bytesLeftToMove = this.initNodes(this.nnc.client.getDatanodeReport(HdfsConstants.DatanodeReportType.LIVE));
            if (bytesLeftToMove == 0L) {
                System.out.println("The cluster is balanced. Exiting...");
                ReturnStatus returnStatus = ReturnStatus.SUCCESS;
                return returnStatus;
            }
            LOG.info((Object)("Need to move " + StringUtils.byteDesc((long)bytesLeftToMove) + " to make the cluster balanced."));
            long bytesToMove = this.chooseNodes();
            if (bytesToMove == 0L) {
                System.out.println("No block can be moved. Exiting...");
                ReturnStatus returnStatus = ReturnStatus.NO_MOVE_BLOCK;
                return returnStatus;
            }
            LOG.info((Object)("Will move " + StringUtils.byteDesc((long)bytesToMove) + " in this iteration"));
            formatter.format("%-24s %10d  %19s  %18s  %17s%n", DateFormat.getDateTimeInstance().format(new Date()), iteration, StringUtils.byteDesc((long)this.bytesMoved.get()), StringUtils.byteDesc((long)bytesLeftToMove), StringUtils.byteDesc((long)bytesToMove));
            if (!this.nnc.shouldContinue(this.dispatchBlockMoves())) {
                ReturnStatus returnStatus = ReturnStatus.NO_MOVE_PROGRESS;
                return returnStatus;
            }
            ReturnStatus returnStatus = ReturnStatus.IN_PROGRESS;
            return returnStatus;
        }
        catch (IllegalArgumentException e) {
            System.out.println(e + ".  Exiting ...");
            ReturnStatus returnStatus = ReturnStatus.ILLEGAL_ARGS;
            return returnStatus;
        }
        catch (IOException e) {
            System.out.println(e + ".  Exiting ...");
            ReturnStatus returnStatus = ReturnStatus.IO_EXCEPTION;
            return returnStatus;
        }
        catch (InterruptedException e) {
            System.out.println(e + ".  Exiting ...");
            ReturnStatus returnStatus = ReturnStatus.INTERRUPTED;
            return returnStatus;
        }
        finally {
            this.dispatcherExecutor.shutdownNow();
            this.moverExecutor.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static int run(Collection<URI> namenodes, Parameters p, Configuration conf) throws IOException, InterruptedException {
        long sleeptime = 2000L * conf.getLong("dfs.heartbeat.interval", 3L);
        LOG.info((Object)("namenodes = " + namenodes));
        LOG.info((Object)("p         = " + p));
        Formatter formatter = new Formatter(System.out);
        System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
        ArrayList<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(namenodes.size());
        try {
            for (URI uri : namenodes) {
                connectors.add(new NameNodeConnector(uri, conf));
            }
            boolean done = false;
            int iteration = 0;
            while (!done) {
                done = true;
                Collections.shuffle(connectors);
                for (NameNodeConnector nnc : connectors) {
                    Balancer b = new Balancer(nnc, p, conf);
                    ReturnStatus r = b.run(iteration, formatter, conf);
                    b.resetData(conf);
                    if (r == ReturnStatus.IN_PROGRESS) {
                        done = false;
                        continue;
                    }
                    if (r == ReturnStatus.SUCCESS) continue;
                    int n = r.code;
                    return n;
                }
                if (!done) {
                    Thread.sleep(sleeptime);
                }
                ++iteration;
            }
        }
        finally {
            for (NameNodeConnector nnc : connectors) {
                nnc.close();
            }
        }
        return ReturnStatus.SUCCESS.code;
    }

    private static String time2Str(long elapsedTime) {
        String unit;
        double time = elapsedTime;
        if (elapsedTime < 1000L) {
            unit = "milliseconds";
        } else if (elapsedTime < 60000L) {
            unit = "seconds";
            time /= 1000.0;
        } else if (elapsedTime < 3600000L) {
            unit = "minutes";
            time /= 60000.0;
        } else {
            unit = "hours";
            time /= 3600000.0;
        }
        return time + " " + unit;
    }

    public static void main(String[] args) {
        if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
            System.exit(0);
        }
        try {
            System.exit(ToolRunner.run((Configuration)new HdfsConfiguration(), (Tool)new Cli(), (String[])args));
        }
        catch (Throwable e) {
            LOG.error((Object)"Exiting balancer due an exception", e);
            System.exit(-1);
        }
    }

    static class Cli
    extends Configured
    implements Tool {
        Cli() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public int run(String[] args) {
            long startTime = Time.now();
            Configuration conf = this.getConf();
            WIN_WIDTH = conf.getLong("dfs.balancer.movedWinWidth", 5400000L);
            try {
                Balancer.checkReplicationPolicyCompatibility(conf);
                Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
                int n = Balancer.run(namenodes, Cli.parse(args), conf);
                return n;
            }
            catch (IOException e) {
                System.out.println(e + ".  Exiting ...");
                int n = ReturnStatus.IO_EXCEPTION.code;
                return n;
            }
            catch (InterruptedException e) {
                System.out.println(e + ".  Exiting ...");
                int n = ReturnStatus.INTERRUPTED.code;
                return n;
            }
            finally {
                System.out.println("Balancing took " + Balancer.time2Str(Time.now() - startTime));
            }
        }

        static Parameters parse(String[] args) {
            BalancingPolicy policy = Parameters.DEFALUT.policy;
            double threshold = Parameters.DEFALUT.threshold;
            if (args != null) {
                try {
                    for (int i = 0; i < args.length; ++i) {
                        if ("-threshold".equalsIgnoreCase(args[i])) {
                            ++i;
                            try {
                                threshold = Double.parseDouble(args[i]);
                                if (threshold < 1.0 || threshold > 100.0) {
                                    throw new IllegalArgumentException("Number out of range: threshold = " + threshold);
                                }
                                LOG.info((Object)("Using a threshold of " + threshold));
                                continue;
                            }
                            catch (IllegalArgumentException e) {
                                System.err.println("Expecting a number in the range of [1.0, 100.0]: " + args[i]);
                                throw e;
                            }
                        }
                        if ("-policy".equalsIgnoreCase(args[i])) {
                            ++i;
                            try {
                                policy = BalancingPolicy.parse(args[i]);
                                continue;
                            }
                            catch (IllegalArgumentException e) {
                                System.err.println("Illegal policy name: " + args[i]);
                                throw e;
                            }
                        }
                        throw new IllegalArgumentException("args = " + Arrays.toString(args));
                    }
                }
                catch (RuntimeException e) {
                    Cli.printUsage(System.err);
                    throw e;
                }
            }
            return new Parameters(policy, threshold);
        }

        private static void printUsage(PrintStream out) {
            out.println(USAGE + "\n");
        }
    }

    static class Parameters {
        static final Parameters DEFALUT = new Parameters(BalancingPolicy.Node.INSTANCE, 10.0);
        final BalancingPolicy policy;
        final double threshold;

        Parameters(BalancingPolicy policy, double threshold) {
            this.policy = policy;
            this.threshold = threshold;
        }

        public String toString() {
            return Balancer.class.getSimpleName() + "." + this.getClass().getSimpleName() + "[" + this.policy + ", threshold=" + this.threshold + "]";
        }
    }

    static enum ReturnStatus {
        SUCCESS(0),
        IN_PROGRESS(1),
        ALREADY_RUNNING(-1),
        NO_MOVE_BLOCK(-2),
        NO_MOVE_PROGRESS(-3),
        IO_EXCEPTION(-4),
        ILLEGAL_ARGS(-5),
        INTERRUPTED(-6);

        final int code;

        private ReturnStatus(int code) {
            this.code = code;
        }
    }

    private static class MovedBlocks {
        private long lastCleanupTime = Time.now();
        private static final int CUR_WIN = 0;
        private static final int OLD_WIN = 1;
        private static final int NUM_WINS = 2;
        private final List<HashMap<Block, BalancerBlock>> movedBlocks = new ArrayList<HashMap<Block, BalancerBlock>>(2);

        private MovedBlocks() {
            this.movedBlocks.add(new HashMap());
            this.movedBlocks.add(new HashMap());
        }

        private synchronized void add(BalancerBlock block) {
            this.movedBlocks.get(0).put(block.getBlock(), block);
        }

        private synchronized boolean contains(BalancerBlock block) {
            return this.contains(block.getBlock());
        }

        private synchronized boolean contains(Block block) {
            return this.movedBlocks.get(0).containsKey(block) || this.movedBlocks.get(1).containsKey(block);
        }

        private synchronized void cleanup() {
            long curTime = Time.now();
            if (this.lastCleanupTime + WIN_WIDTH <= curTime) {
                this.movedBlocks.set(1, this.movedBlocks.get(0));
                this.movedBlocks.set(0, new HashMap());
                this.lastCleanupTime = curTime;
            }
        }
    }

    private static class BytesMoved {
        private long bytesMoved = 0L;

        private BytesMoved() {
        }

        private synchronized void inc(long bytes) {
            this.bytesMoved += bytes;
        }

        private synchronized long get() {
            return this.bytesMoved;
        }
    }

    private static interface Matcher {
        public boolean match(NetworkTopology var1, Node var2, Node var3);
    }

    private class Source
    extends BalancerDatanode {
        private final ArrayList<NodeTask> nodeTasks;
        private long blocksToReceive;
        private final List<BalancerBlock> srcBlockList;
        private static final int SOURCE_BLOCK_LIST_MIN_SIZE = 5;
        private static final long MAX_ITERATION_TIME = 1200000L;

        private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
            super(node, policy, threshold);
            this.nodeTasks = new ArrayList(2);
            this.blocksToReceive = 0L;
            this.srcBlockList = new ArrayList<BalancerBlock>();
        }

        private void addNodeTask(NodeTask task) {
            assert (task.datanode != this) : "Source and target are the same " + this.datanode;
            this.incScheduledSize(task.getSize());
            this.nodeTasks.add(task);
        }

        private Iterator<BalancerBlock> getBlockIterator() {
            return this.srcBlockList.iterator();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private long getBlockList() throws IOException {
            BlocksWithLocations.BlockWithLocations[] newBlocks = ((Balancer)Balancer.this).nnc.namenode.getBlocks(this.datanode, Math.min(0x80000000L, this.blocksToReceive)).getBlocks();
            long bytesReceived = 0L;
            for (BlocksWithLocations.BlockWithLocations blk : newBlocks) {
                bytesReceived += blk.getBlock().getNumBytes();
                Map map = Balancer.this.globalBlockList;
                synchronized (map) {
                    BalancerBlock block = (BalancerBlock)Balancer.this.globalBlockList.get(blk.getBlock());
                    if (block == null) {
                        block = new BalancerBlock(blk.getBlock());
                        Balancer.this.globalBlockList.put(blk.getBlock(), block);
                    } else {
                        block.clearLocations();
                    }
                    BalancerBlock balancerBlock = block;
                    synchronized (balancerBlock) {
                        for (String datanodeUuid : blk.getDatanodeUuids()) {
                            BalancerDatanode d = (BalancerDatanode)Balancer.this.datanodeMap.get(datanodeUuid);
                            if (this.datanode == null) continue;
                            block.addLocation(d);
                        }
                    }
                    if (!this.srcBlockList.contains(block) && this.isGoodBlockCandidate(block)) {
                        this.srcBlockList.add(block);
                    }
                }
            }
            return bytesReceived;
        }

        private boolean isGoodBlockCandidate(BalancerBlock block) {
            for (NodeTask nodeTask : this.nodeTasks) {
                if (!Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) continue;
                return true;
            }
            return false;
        }

        private PendingBlockMove chooseNextBlockToMove() {
            Iterator<NodeTask> tasks = this.nodeTasks.iterator();
            while (tasks.hasNext()) {
                PendingBlockMove pendingBlock;
                NodeTask task = tasks.next();
                BalancerDatanode target = task.getDatanode();
                if (!target.addPendingBlock(pendingBlock = new PendingBlockMove())) continue;
                pendingBlock.source = this;
                pendingBlock.target = target;
                if (pendingBlock.chooseBlockAndProxy()) {
                    long blockSize = pendingBlock.block.getNumBytes();
                    this.decScheduledSize(blockSize);
                    task.size -= blockSize;
                    if (task.size == 0L) {
                        tasks.remove();
                    }
                    return pendingBlock;
                }
                target.removePendingBlock(pendingBlock);
            }
            return null;
        }

        private void filterMovedBlocks() {
            Iterator<BalancerBlock> blocks = this.getBlockIterator();
            while (blocks.hasNext()) {
                if (!Balancer.this.movedBlocks.contains(blocks.next())) continue;
                blocks.remove();
            }
        }

        private boolean shouldFetchMoreBlocks() {
            return this.srcBlockList.size() < 5 && this.blocksToReceive > 0L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatchBlocks() {
            long startTime = Time.now();
            long scheduledSize = this.getScheduledSize();
            this.blocksToReceive = 2L * scheduledSize;
            boolean isTimeUp = false;
            int noPendingBlockIteration = 0;
            while (!(isTimeUp || this.getScheduledSize() <= 0L || this.srcBlockList.isEmpty() && this.blocksToReceive <= 0L)) {
                PendingBlockMove pendingBlock = this.chooseNextBlockToMove();
                if (pendingBlock != null) {
                    pendingBlock.scheduleBlockMove();
                    continue;
                }
                this.filterMovedBlocks();
                if (this.shouldFetchMoreBlocks()) {
                    try {
                        this.blocksToReceive -= this.getBlockList();
                        continue;
                    }
                    catch (IOException e) {
                        LOG.warn((Object)"Exception while getting block list", (Throwable)e);
                        return;
                    }
                }
                if (++noPendingBlockIteration >= 5) {
                    this.setScheduledSize(0L);
                }
                if (Time.now() - startTime > 1200000L) {
                    isTimeUp = true;
                    continue;
                }
                try {
                    Balancer e = Balancer.this;
                    synchronized (e) {
                        Balancer.this.wait(1000L);
                    }
                }
                catch (InterruptedException ignored) {
                }
            }
        }

        private class BlockMoveDispatcher
        implements Runnable {
            private BlockMoveDispatcher() {
            }

            @Override
            public void run() {
                Source.this.dispatchBlocks();
            }
        }
    }

    private static class BalancerDatanode {
        private static final long MAX_SIZE_TO_MOVE = 0x280000000L;
        final DatanodeInfo datanode;
        final double utilization;
        final long maxSize2Move;
        private long scheduledSize = 0L;
        protected long delayUntil = 0L;
        private final List<PendingBlockMove> pendingBlocks = new ArrayList<PendingBlockMove>(5);

        public String toString() {
            return this.getClass().getSimpleName() + "[" + this.datanode + ", utilization=" + this.utilization + "]";
        }

        private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
            this.datanode = node;
            this.utilization = policy.getUtilization(node);
            double avgUtil = policy.getAvgUtilization();
            long maxSizeToMove = this.utilization >= avgUtil + threshold || this.utilization <= avgUtil - threshold ? (long)(threshold * (double)this.datanode.getCapacity() / 100.0) : (long)(Math.abs(avgUtil - this.utilization) * (double)this.datanode.getCapacity() / 100.0);
            if (this.utilization < avgUtil) {
                maxSizeToMove = Math.min(this.datanode.getRemaining(), maxSizeToMove);
            }
            this.maxSize2Move = Math.min(0x280000000L, maxSizeToMove);
        }

        protected DatanodeInfo getDatanode() {
            return this.datanode;
        }

        protected String getDisplayName() {
            return this.datanode.toString();
        }

        protected String getStorageID() {
            return this.datanode.getDatanodeUuid();
        }

        protected synchronized boolean hasSpaceForScheduling() {
            return this.scheduledSize < this.maxSize2Move;
        }

        protected synchronized long availableSizeToMove() {
            return this.maxSize2Move - this.scheduledSize;
        }

        protected synchronized void incScheduledSize(long size) {
            this.scheduledSize += size;
        }

        protected synchronized void decScheduledSize(long size) {
            this.scheduledSize -= size;
        }

        protected synchronized long getScheduledSize() {
            return this.scheduledSize;
        }

        protected synchronized void setScheduledSize(long size) {
            this.scheduledSize = size;
        }

        private synchronized void activateDelay(long delta) {
            this.delayUntil = Time.now() + delta;
        }

        private synchronized boolean isDelayActive() {
            if (this.delayUntil == 0L || Time.now() > this.delayUntil) {
                this.delayUntil = 0L;
                return false;
            }
            return true;
        }

        private synchronized boolean isPendingQNotFull() {
            return this.pendingBlocks.size() < 5;
        }

        private synchronized boolean isPendingQEmpty() {
            return this.pendingBlocks.isEmpty();
        }

        private synchronized boolean addPendingBlock(PendingBlockMove pendingBlock) {
            if (!this.isDelayActive() && this.isPendingQNotFull()) {
                return this.pendingBlocks.add(pendingBlock);
            }
            return false;
        }

        private synchronized boolean removePendingBlock(PendingBlockMove pendingBlock) {
            return this.pendingBlocks.remove(pendingBlock);
        }
    }

    private static class NodeTask {
        private final BalancerDatanode datanode;
        private long size;

        private NodeTask(BalancerDatanode datanode, long size) {
            this.datanode = datanode;
            this.size = size;
        }

        private BalancerDatanode getDatanode() {
            return this.datanode;
        }

        private long getSize() {
            return this.size;
        }
    }

    private static class BalancerBlock {
        private final Block block;
        private final List<BalancerDatanode> locations = new ArrayList<BalancerDatanode>(3);

        private BalancerBlock(Block block) {
            this.block = block;
        }

        private synchronized void clearLocations() {
            this.locations.clear();
        }

        private synchronized void addLocation(BalancerDatanode datanode) {
            if (!this.locations.contains(datanode)) {
                this.locations.add(datanode);
            }
        }

        private synchronized boolean isLocatedOnDatanode(BalancerDatanode datanode) {
            return this.locations.contains(datanode);
        }

        private synchronized List<BalancerDatanode> getLocations() {
            return this.locations;
        }

        private Block getBlock() {
            return this.block;
        }

        private long getNumBytes() {
            return this.block.getNumBytes();
        }
    }

    private class PendingBlockMove {
        private BalancerBlock block;
        private Source source;
        private BalancerDatanode proxySource;
        private BalancerDatanode target;

        private PendingBlockMove() {
        }

        public String toString() {
            Block b = this.block.getBlock();
            return b + " with size=" + b.getNumBytes() + " from " + this.source.getDisplayName() + " to " + this.target.getDisplayName() + " through " + this.proxySource.getDisplayName();
        }

        private boolean chooseBlockAndProxy() {
            Iterator blocks = this.source.getBlockIterator();
            while (blocks.hasNext()) {
                if (!this.markMovedIfGoodBlock((BalancerBlock)blocks.next())) continue;
                blocks.remove();
                return true;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean markMovedIfGoodBlock(BalancerBlock block) {
            BalancerBlock balancerBlock = block;
            synchronized (balancerBlock) {
                MovedBlocks movedBlocks = Balancer.this.movedBlocks;
                synchronized (movedBlocks) {
                    if (Balancer.this.isGoodBlockCandidate(this.source, this.target, block)) {
                        this.block = block;
                        if (this.chooseProxySource()) {
                            Balancer.this.movedBlocks.add(block);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("Decided to move " + this));
                            }
                            return true;
                        }
                    }
                }
            }
            return false;
        }

        private boolean chooseProxySource() {
            DatanodeInfo targetDN = this.target.getDatanode();
            if (Balancer.this.cluster.isNodeGroupAware()) {
                for (BalancerDatanode loc : this.block.getLocations()) {
                    if (!Balancer.this.cluster.isOnSameNodeGroup((Node)loc.getDatanode(), (Node)targetDN) || !this.addTo(loc)) continue;
                    return true;
                }
            }
            for (BalancerDatanode loc : this.block.getLocations()) {
                if (!Balancer.this.cluster.isOnSameRack((Node)loc.getDatanode(), (Node)targetDN) || !this.addTo(loc)) continue;
                return true;
            }
            for (BalancerDatanode loc : this.block.getLocations()) {
                if (!this.addTo(loc)) continue;
                return true;
            }
            return false;
        }

        private boolean addTo(BalancerDatanode bdn) {
            if (bdn.addPendingBlock(this)) {
                this.proxySource = bdn;
                return true;
            }
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dispatch() {
            block24: {
                Object unbufOut;
                Socket sock = new Socket();
                DataOutputStream out = null;
                DataInputStream in = null;
                try {
                    sock.connect(NetUtils.createSocketAddr((String)this.target.datanode.getXferAddr()), 60000);
                    sock.setSoTimeout(1200000);
                    sock.setKeepAlive(true);
                    unbufOut = sock.getOutputStream();
                    InputStream unbufIn = sock.getInputStream();
                    if (Balancer.this.nnc.getDataEncryptionKey() != null) {
                        IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams((OutputStream)unbufOut, unbufIn, Balancer.this.nnc.getDataEncryptionKey());
                        unbufOut = encryptedStreams.out;
                        unbufIn = encryptedStreams.in;
                    }
                    out = new DataOutputStream(new BufferedOutputStream((OutputStream)unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE));
                    in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
                    this.sendRequest(out);
                    this.receiveResponse(in);
                    Balancer.this.bytesMoved.inc(this.block.getNumBytes());
                    LOG.info((Object)("Successfully moved " + this));
                    IOUtils.closeStream((Closeable)out);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Failed to move " + this + ": " + e.getMessage()));
                    this.proxySource.activateDelay(10000L);
                    this.target.activateDelay(10000L);
                    break block24;
                }
                finally {
                    IOUtils.closeStream(out);
                    IOUtils.closeStream(in);
                    IOUtils.closeSocket((Socket)sock);
                    this.proxySource.removePendingBlock(this);
                    this.target.removePendingBlock(this);
                    Object object = this;
                    synchronized (object) {
                        this.reset();
                    }
                    object = Balancer.this;
                    synchronized (object) {
                        Balancer.this.notifyAll();
                    }
                }
                IOUtils.closeStream((Closeable)in);
                IOUtils.closeSocket((Socket)sock);
                this.proxySource.removePendingBlock(this);
                this.target.removePendingBlock(this);
                unbufOut = this;
                synchronized (unbufOut) {
                    this.reset();
                }
                unbufOut = Balancer.this;
                synchronized (unbufOut) {
                    Balancer.this.notifyAll();
                }
            }
        }

        private void sendRequest(DataOutputStream out) throws IOException {
            ExtendedBlock eb = new ExtendedBlock(((Balancer)Balancer.this).nnc.blockpoolID, this.block.getBlock());
            Token<BlockTokenIdentifier> accessToken = Balancer.this.nnc.getAccessToken(eb);
            new Sender(out).replaceBlock(eb, accessToken, this.source.getStorageID(), this.proxySource.getDatanode());
        }

        private void receiveResponse(DataInputStream in) throws IOException {
            DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
            if (response.getStatus() != DataTransferProtos.Status.SUCCESS) {
                if (response.getStatus() == DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                    throw new IOException("block move failed due to access token error");
                }
                throw new IOException("block move is failed: " + response.getMessage());
            }
        }

        private void reset() {
            this.block = null;
            this.source = null;
            this.proxySource = null;
            this.target = null;
        }

        private void scheduleBlockMove() {
            Balancer.this.moverExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Start moving " + PendingBlockMove.this));
                    }
                    PendingBlockMove.this.dispatch();
                }
            });
        }
    }
}

