/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestBalancerWithHANameNodes {
    private MiniDFSCluster cluster;
    ClientProtocol client;
    private static final String[] TEST_RACKS = new String[]{"/rack0", "/rack1"};
    private static final long[] TEST_CAPACITIES = new long[]{5000L, 5000L};

    public static void waitStoragesNoStale(MiniDFSCluster cluster, ClientProtocol client, int nnIndex) throws Exception {
        cluster.triggerBlockReports();
        DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
        GenericTestUtils.waitFor(() -> {
            BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager();
            for (DatanodeInfo dn : dataNodes) {
                DatanodeStorageInfo[] storageInfos;
                for (DatanodeStorageInfo s : storageInfos = bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid()).getStorageInfos()) {
                    if (!s.areBlockContentsStale()) continue;
                    return false;
                }
            }
            return true;
        }, (long)300L, (long)60000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testBalancerWithHANameNodes() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        Assert.assertEquals((long)TEST_CAPACITIES.length, (long)TEST_RACKS.length);
        MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
        nn1Conf.setIpcPort(8020);
        Configuration copiedConf = new Configuration((Configuration)conf);
        this.cluster = new MiniDFSCluster.Builder(copiedConf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS).simulatedCapacities(TEST_CAPACITIES).build();
        HATestUtil.setFailoverConfigurations(this.cluster, (Configuration)conf);
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            Thread.sleep(500L);
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)FileSystem.getDefaultUri((Configuration)conf), ClientProtocol.class).getProxy();
            this.doTest((Configuration)conf, true);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    void doTest(Configuration conf) throws Exception {
        this.doTest(conf, false);
    }

    void doTest(Configuration conf, boolean withHA) throws Exception {
        boolean isRequestStandby;
        int numOfDatanodes = TEST_CAPACITIES.length;
        long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
        long totalUsedSpace = totalCapacity * 3L / 10L;
        TestBalancer.createFile(this.cluster, TestBalancer.filePath, totalUsedSpace / (long)numOfDatanodes, (short)numOfDatanodes, 0);
        boolean bl = isRequestStandby = !conf.getBoolean("dfs.namenode.get-blocks.check.operation", true);
        if (isRequestStandby) {
            HATestUtil.waitForStandbyToCatchUp(this.cluster.getNameNode(0), this.cluster.getNameNode(1));
        }
        if (withHA) {
            TestBalancerWithHANameNodes.waitStoragesNoStale(this.cluster, this.client, 0);
        }
        long newNodeCapacity = 5000L;
        String newNodeRack = "/rack2";
        this.cluster.startDataNodes(conf, 1, true, null, new String[]{newNodeRack}, new long[]{newNodeCapacity});
        TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity += newNodeCapacity, this.client, this.cluster);
        Collection namenodes = DFSUtil.getInternalNsRpcUris((Configuration)conf);
        Collection nsIds = DFSUtilClient.getNameServiceIds((Configuration)conf);
        Assert.assertEquals((long)1L, (long)namenodes.size());
        int r = Balancer.run((Collection)namenodes, (Collection)nsIds, (BalancerParameters)BalancerParameters.DEFAULT, (Configuration)conf);
        Assert.assertEquals((long)ExitStatus.SUCCESS.getExitCode(), (long)r);
        TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, this.client, this.cluster, BalancerParameters.DEFAULT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testBalancerRequestSBNWithHA() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.get-blocks.check.operation", false);
        conf.setLong("dfs.ha.tail-edits.period", 1L);
        TestBalancer.initConf((Configuration)conf);
        Assert.assertEquals((long)TEST_CAPACITIES.length, (long)TEST_RACKS.length);
        MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
        nn1Conf.setIpcPort(8020);
        Configuration copiedConf = new Configuration((Configuration)conf);
        this.cluster = new MiniDFSCluster.Builder(copiedConf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS).simulatedCapacities(TEST_CAPACITIES).build();
        GenericTestUtils.LogCapturer log = GenericTestUtils.LogCapturer.captureLogs((Logger)LoggerFactory.getLogger(NameNodeConnector.class));
        HATestUtil.setFailoverConfigurations(this.cluster, (Configuration)conf);
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            String standbyNameNode = this.cluster.getNameNode(1).getNameNodeAddress().getHostString();
            Thread.sleep(500L);
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)FileSystem.getDefaultUri((Configuration)conf), ClientProtocol.class).getProxy();
            this.doTest((Configuration)conf);
            Assert.assertTrue((boolean)log.getOutput().contains("Request #getBlocks to Standby NameNode success. remoteAddress: " + standbyNameNode));
            Assert.assertTrue((boolean)log.getOutput().contains("Request #getLiveDatanodeStorageReport to Standby NameNode success. remoteAddress: " + standbyNameNode));
        }
        finally {
            this.cluster.shutdown();
        }
    }

    @Test(timeout=120000L)
    public void testBalancerWithObserver() throws Exception {
        this.testBalancerWithObserver(false);
    }

    @Test(timeout=180000L)
    public void testBalancerWithObserverWithFailedNode() throws Exception {
        this.testBalancerWithObserver(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBalancerWithObserver(boolean withObserverFailure) throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        conf.setBoolean("fs.hdfs.impl.disable.cache", true);
        conf.setBoolean("dfs.client.failover.random.order", false);
        if (withObserverFailure) {
            conf.setInt("ipc.client.connect.max.retries", 2);
        }
        MiniQJMHACluster qjmhaCluster = null;
        try {
            qjmhaCluster = HATestUtil.setUpObserverCluster((Configuration)conf, 2, TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS);
            this.cluster = qjmhaCluster.getDfsCluster();
            this.cluster.waitClusterUp();
            this.cluster.waitActive();
            ArrayList<FSNamesystem> namesystemSpies = new ArrayList<FSNamesystem>();
            for (int i = 0; i < this.cluster.getNumNameNodes(); ++i) {
                namesystemSpies.add(NameNodeAdapter.spyOnNamesystem(this.cluster.getNameNode(i)));
            }
            if (withObserverFailure) {
                this.cluster.shutdownNameNode(2);
            }
            DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(this.cluster, (Configuration)conf, ObserverReadProxyProvider.class, true);
            this.client = dfs.getClient().getNamenode();
            this.doTest((Configuration)conf);
            for (int i = 0; i < this.cluster.getNumNameNodes(); ++i) {
                int expectedObserverIdx = withObserverFailure ? 3 : 2;
                int expectedCount = i == expectedObserverIdx ? 2 : 0;
                ((FSNamesystem)Mockito.verify((Object)((FSNamesystem)namesystemSpies.get(i)), (VerificationMode)Mockito.times((int)expectedCount))).getBlocks((DatanodeID)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (StorageType)ArgumentMatchers.any());
            }
        }
        finally {
            if (qjmhaCluster != null) {
                qjmhaCluster.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testGetLiveDatanodeStorageReport() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        Assert.assertEquals((long)TEST_CAPACITIES.length, (long)TEST_RACKS.length);
        MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
        nn1Conf.setIpcPort(8020);
        Configuration copiedConf = new Configuration((Configuration)conf);
        GenericTestUtils.LogCapturer log = GenericTestUtils.LogCapturer.captureLogs((Logger)LoggerFactory.getLogger(NameNodeConnector.class));
        copiedConf.setInt("dfs.heartbeat.interval", 60000);
        this.cluster = new MiniDFSCluster.Builder(copiedConf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS).simulatedCapacities(TEST_CAPACITIES).build();
        HATestUtil.setFailoverConfigurations(this.cluster, (Configuration)conf);
        try {
            this.cluster.waitActive();
            this.cluster.transitionToActive(0);
            URI namenode = (URI)DFSUtil.getInternalNsRpcUris((Configuration)conf).toArray()[0];
            String nsId = DFSUtilClient.getNameServiceIds((Configuration)conf).toArray()[0].toString();
            NameNodeConnector nncActive = new NameNodeConnector("nncActive", namenode, nsId, new Path("/test"), null, (Configuration)conf, 5);
            DatanodeStorageReport[] datanodeStorageReportFromAnn = nncActive.getLiveDatanodeStorageReport();
            Assert.assertTrue((!log.getOutput().contains("Request #getLiveDatanodeStorageReport to Standby NameNode success") ? 1 : 0) != 0);
            nncActive.close();
            conf.setBoolean("dfs.namenode.get-blocks.check.operation", false);
            NameNodeConnector nncStandby = new NameNodeConnector("nncStandby", namenode, nsId, new Path("/test"), null, (Configuration)conf, 5);
            DatanodeStorageReport[] datanodeStorageReportFromSnn = nncStandby.getLiveDatanodeStorageReport();
            Assert.assertTrue((boolean)log.getOutput().contains("Request #getLiveDatanodeStorageReport to Standby NameNode success"));
            nncStandby.close();
            Assert.assertEquals((Object)datanodeStorageReportFromAnn[0].getDatanodeInfo().getDatanodeReport(), (Object)datanodeStorageReportFromSnn[0].getDatanodeInfo().getDatanodeReport());
            Assert.assertEquals((Object)datanodeStorageReportFromAnn[1].getDatanodeInfo().getDatanodeReport(), (Object)datanodeStorageReportFromSnn[1].getDatanodeInfo().getDatanodeReport());
            for (int i = 0; i < TEST_CAPACITIES.length; ++i) {
                Assert.assertEquals((Object)datanodeStorageReportFromAnn[i].getStorageReports()[0].getStorage().toString(), (Object)datanodeStorageReportFromSnn[i].getStorageReports()[0].getStorage().toString());
                Assert.assertEquals((long)datanodeStorageReportFromAnn[i].getStorageReports()[0].getCapacity(), (long)datanodeStorageReportFromSnn[i].getStorageReports()[0].getCapacity());
                Assert.assertEquals((long)datanodeStorageReportFromAnn[i].getStorageReports()[0].getBlockPoolUsed(), (long)datanodeStorageReportFromSnn[i].getStorageReports()[0].getBlockPoolUsed());
                Assert.assertEquals((long)datanodeStorageReportFromAnn[i].getStorageReports()[0].getDfsUsed(), (long)datanodeStorageReportFromSnn[i].getStorageReports()[0].getDfsUsed());
                Assert.assertEquals((long)datanodeStorageReportFromAnn[i].getStorageReports()[0].getRemaining(), (long)datanodeStorageReportFromSnn[i].getStorageReports()[0].getRemaining());
                Assert.assertEquals((Object)datanodeStorageReportFromAnn[i].getStorageReports()[0].getMount(), (Object)datanodeStorageReportFromSnn[i].getStorageReports()[0].getMount());
                Assert.assertEquals((long)datanodeStorageReportFromAnn[i].getStorageReports()[0].getNonDfsUsed(), (long)datanodeStorageReportFromSnn[i].getStorageReports()[0].getNonDfsUsed());
                Assert.assertEquals((Object)datanodeStorageReportFromAnn[i].getStorageReports()[0].isFailed(), (Object)datanodeStorageReportFromSnn[i].getStorageReports()[0].isFailed());
            }
        }
        finally {
            this.cluster.shutdown();
        }
    }

    static {
        TestBalancer.initTestSetup();
    }
}

