/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniHDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HAStressTestHarness;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestPipelinesFailover {
    protected static final Log LOG;
    private static final Path TEST_PATH;
    private static final int BLOCK_SIZE = 4096;
    private static final int BLOCK_AND_A_HALF = 6144;
    private static final int STRESS_NUM_THREADS = 25;
    private static final int STRESS_RUNTIME = 40000;

    @Test(timeout=30000L)
    public void testWriteOverGracefulFailover() throws Exception {
        this.doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER, MethodToTestIdempotence.ALLOCATE_BLOCK);
    }

    @Test(timeout=30000L)
    public void testAllocateBlockAfterCrashFailover() throws Exception {
        this.doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, MethodToTestIdempotence.ALLOCATE_BLOCK);
    }

    @Test(timeout=30000L)
    public void testCompleteFileAfterCrashFailover() throws Exception {
        this.doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED, MethodToTestIdempotence.COMPLETE_FILE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWriteOverFailoverTest(TestScenario scenario, MethodToTestIdempotence methodToTest) throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("dfs.blocksize", 4096);
        conf.setInt("dfs.namenode.replication.interval", 1000);
        FSDataOutputStream stm = null;
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).buildHDFS();
        try {
            int sizeWritten = 0;
            cluster.waitActive();
            cluster.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            sizeWritten += 6144;
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            scenario.run(cluster);
            FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
            BlockManagerTestUtil.updateState(ns1.getBlockManager());
            Assert.assertEquals((long)0L, (long)ns1.getPendingReplicationBlocks());
            Assert.assertEquals((long)0L, (long)ns1.getCorruptReplicaBlocks());
            Assert.assertEquals((long)0L, (long)ns1.getMissingBlocksCount());
            if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
                AppendTestUtil.write((OutputStream)stm, sizeWritten, 6144);
                sizeWritten += 6144;
            }
            stm.close();
            stm = null;
            AppendTestUtil.check(fs, TEST_PATH, sizeWritten);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    @Test(timeout=30000L)
    public void testWriteOverGracefulFailoverWithDnFail() throws Exception {
        this.doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER);
    }

    @Test(timeout=30000L)
    public void testWriteOverCrashFailoverWithDnFail() throws Exception {
        this.doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestWriteOverFailoverWithDnFail(TestScenario scenario) throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("dfs.blocksize", 4096);
        FSDataOutputStream stm = null;
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(5).buildHDFS();
        try {
            cluster.waitActive();
            cluster.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            scenario.run(cluster);
            Assert.assertTrue((boolean)fs.exists(TEST_PATH));
            cluster.stopDataNode(0);
            AppendTestUtil.write((OutputStream)stm, 6144, 6144);
            stm.hflush();
            LOG.info((Object)"Failing back to NN 0");
            cluster.transitionToStandby(1);
            cluster.transitionToActive(0);
            cluster.stopDataNode(1);
            AppendTestUtil.write((OutputStream)stm, 12288, 6144);
            stm.hflush();
            stm.close();
            stm = null;
            AppendTestUtil.check(fs, TEST_PATH, 18432L);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testLeaseRecoveryAfterFailover() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.permissions.enabled", false);
        conf.setInt("dfs.blocksize", 4096);
        FSDataOutputStream stm = null;
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).buildHDFS();
        try {
            cluster.waitActive();
            cluster.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 6144);
            stm.hflush();
            LOG.info((Object)"Failing over to NN 1");
            cluster.transitionToStandby(0);
            cluster.transitionToActive(1);
            Assert.assertTrue((boolean)fs.exists(TEST_PATH));
            DistributedFileSystem fsOtherUser = this.createFsAsOtherUser(cluster, conf);
            TestPipelinesFailover.loopRecoverLease((FileSystem)fsOtherUser, TEST_PATH);
            AppendTestUtil.check(fs, TEST_PATH, 6144L);
            cluster.transitionToStandby(1);
            cluster.transitionToActive(0);
            AppendTestUtil.check(fs, TEST_PATH, 6144L);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testFailoverRightBeforeCommitSynchronization() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.permissions.enabled", false);
        conf.setInt("dfs.blocksize", 4096);
        FSDataOutputStream stm = null;
        MiniHDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).buildHDFS();
        try {
            cluster.waitActive();
            cluster.transitionToActive(0);
            Thread.sleep(500L);
            LOG.info((Object)"Starting with NN 0 active");
            FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
            stm = fs.create(TEST_PATH);
            AppendTestUtil.write((OutputStream)stm, 0, 2048);
            stm.hflush();
            NameNode nn0 = cluster.getNameNode(0);
            ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
            DatanodeDescriptor expectedPrimary = this.getExpectedPrimaryNode(nn0, blk);
            LOG.info((Object)("Expecting block recovery to be triggered on DN " + expectedPrimary));
            DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
            DatanodeProtocolClientSideTranslatorPB nnSpy = DataNodeTestUtils.spyOnBposToNN(primaryDN, nn0);
            GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
            ((DatanodeProtocolClientSideTranslatorPB)Mockito.doAnswer((Answer)delayer).when((Object)nnSpy)).commitBlockSynchronization((ExtendedBlock)Mockito.eq((Object)blk), (long)Mockito.anyInt(), Mockito.anyLong(), Mockito.eq((boolean)true), Mockito.eq((boolean)false), (DatanodeID[])Mockito.anyObject(), (String[])Mockito.anyObject());
            DistributedFileSystem fsOtherUser = this.createFsAsOtherUser(cluster, conf);
            Assert.assertFalse((boolean)fsOtherUser.recoverLease(TEST_PATH));
            LOG.info((Object)"Waiting for commitBlockSynchronization call from primary");
            delayer.waitForCall();
            LOG.info((Object)"Failing over to NN 1");
            cluster.transitionToStandby(0);
            cluster.transitionToActive(1);
            delayer.proceed();
            delayer.waitForResult();
            Throwable t = delayer.getThrown();
            if (t == null) {
                Assert.fail((String)"commitBlockSynchronization call did not fail on standby");
            }
            GenericTestUtils.assertExceptionContains((String)"Operation category WRITE is not supported", (Throwable)t);
            TestPipelinesFailover.loopRecoverLease((FileSystem)fsOtherUser, TEST_PATH);
            AppendTestUtil.check(fs, TEST_PATH, 2048L);
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(stm);
            cluster.shutdown();
            throw throwable;
        }
        IOUtils.closeStream((Closeable)stm);
        cluster.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testPipelineRecoveryStress() throws Exception {
        HAStressTestHarness harness = new HAStressTestHarness();
        harness.conf.setBoolean("dfs.permissions.enabled", false);
        harness.conf.setInt("dfs.client.failover.sleep.max.millis", 1000);
        MiniHDFSCluster cluster = harness.startCluster();
        try {
            cluster.waitActive();
            cluster.transitionToActive(0);
            FileSystem fs = harness.getFailoverFs();
            DistributedFileSystem fsAsOtherUser = this.createFsAsOtherUser(cluster, harness.conf);
            MultithreadedTestUtil.TestContext testers = new MultithreadedTestUtil.TestContext();
            for (int i = 0; i < 25; ++i) {
                Path p = new Path("/test-" + i);
                testers.addThread((MultithreadedTestUtil.TestingThread)new PipelineTestThread(testers, fs, (FileSystem)fsAsOtherUser, p));
            }
            harness.addReplicationTriggerThread(500);
            harness.addFailoverThread(5000);
            harness.startThreads();
            testers.startThreads();
            testers.waitFor(40000L);
            testers.stop();
            harness.stopThreads();
        }
        finally {
            System.err.println("===========================\n\n\n\n");
            harness.shutdown();
        }
    }

    private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, ExtendedBlock blk) {
        BlockManager bm0 = nn.getNamesystem().getBlockManager();
        BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
        Assert.assertTrue((String)("Block " + blk + " should be under construction, " + "got: " + storedBlock), (boolean)(storedBlock instanceof BlockInfoUnderConstruction));
        BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)storedBlock;
        DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
        DatanodeDescriptor expectedPrimary = datanodes[0];
        long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
        for (int i = 1; i < datanodes.length; ++i) {
            if (datanodes[i].getLastUpdate() <= mostRecentLastUpdate) continue;
            expectedPrimary = datanodes[i];
            mostRecentLastUpdate = expectedPrimary.getLastUpdate();
        }
        return expectedPrimary;
    }

    private DistributedFileSystem createFsAsOtherUser(final MiniHDFSCluster cluster, final Configuration conf) throws IOException, InterruptedException {
        return (DistributedFileSystem)UserGroupInformation.createUserForTesting((String)"otheruser", (String[])new String[]{"othergroup"}).doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FileSystem>(){

            @Override
            public FileSystem run() throws Exception {
                return HATestUtil.configureFailoverFs(cluster, conf);
            }
        });
    }

    private static void loopRecoverLease(final FileSystem fsOtherUser, final Path testPath) throws TimeoutException, InterruptedException {
        try {
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    boolean success;
                    try {
                        success = ((DistributedFileSystem)fsOtherUser).recoverLease(testPath);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    if (!success) {
                        LOG.info((Object)"Waiting to recover lease successfully");
                    }
                    return success;
                }
            }, (int)1000, (int)60000);
        }
        catch (TimeoutException e) {
            throw new TimeoutException("Timed out recovering lease for " + testPath);
        }
    }

    static {
        ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog((String)"org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
        LOG = LogFactory.getLog(TestPipelinesFailover.class);
        TEST_PATH = new Path("/test-file");
    }

    private static class PipelineTestThread
    extends MultithreadedTestUtil.RepeatingTestThread {
        private final FileSystem fs;
        private final FileSystem fsOtherUser;
        private final Path path;

        public PipelineTestThread(MultithreadedTestUtil.TestContext ctx, FileSystem fs, FileSystem fsOtherUser, Path p) {
            super(ctx);
            this.fs = fs;
            this.fsOtherUser = fsOtherUser;
            this.path = p;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void doAnAction() throws Exception {
            FSDataOutputStream stm = this.fs.create(this.path, true);
            try {
                AppendTestUtil.write((OutputStream)stm, 0, 100);
                stm.hflush();
                TestPipelinesFailover.loopRecoverLease(this.fsOtherUser, this.path);
                AppendTestUtil.check(this.fs, this.path, 100L);
            }
            finally {
                try {
                    stm.close();
                }
                catch (IOException iOException) {}
            }
        }

        public String toString() {
            return "Pipeline test thread for " + this.path;
        }
    }

    static enum MethodToTestIdempotence {
        ALLOCATE_BLOCK,
        COMPLETE_FILE;

    }

    static enum TestScenario {
        GRACEFUL_FAILOVER{

            @Override
            void run(MiniHDFSCluster cluster) throws IOException {
                cluster.transitionToStandby(0);
                cluster.transitionToActive(1);
            }
        }
        ,
        ORIGINAL_ACTIVE_CRASHED{

            @Override
            void run(MiniHDFSCluster cluster) throws IOException {
                cluster.restartNameNode(0);
                cluster.transitionToActive(1);
            }
        };


        abstract void run(MiniHDFSCluster var1) throws IOException;
    }
}

