/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.fedbalance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class TestDistCpProcedure {
    private static MiniDFSCluster cluster;
    private static Configuration conf;
    static final String MOUNT = "mock_mount_point";
    private static final String SRCDAT = "srcdat";
    private static final String DSTDAT = "dstdat";
    private static final long BLOCK_SIZE = 1024L;
    private static final long FILE_SIZE = 102400L;
    private FileEntry[] srcfiles = new FileEntry[]{new FileEntry("srcdat", true), new FileEntry("srcdat/a", false), new FileEntry("srcdat/b", true), new FileEntry("srcdat/b/c", false)};
    private static String nnUri;
    @Rule
    public Timeout globalTimeout = new Timeout(180000L, TimeUnit.MILLISECONDS);

    @BeforeClass
    public static void beforeClass() throws IOException {
        DistCpProcedure.enableForTest();
        conf = new Configuration();
        conf.setLong("dfs.namenode.fs-limits.min-block-size", 1024L);
        conf.setLong("dfs.blocksize", 1024L);
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        cluster.waitActive();
        String workPath = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
        conf.set("hdfs.fedbalance.procedure.scheduler.journal.uri", workPath);
        nnUri = FileSystem.getDefaultUri((Configuration)conf).toString();
    }

    @AfterClass
    public static void afterClass() {
        DistCpProcedure.disableForTest();
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    public void testSuccessfulDistCpProcedure() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FsPermission originalPerm = new FsPermission(777);
        fs.setPermission(src, originalPerm);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
        scheduler.init(true);
        BalanceJob balanceJob = new BalanceJob.Builder().nextProcedure((BalanceProcedure)dcProcedure).build();
        scheduler.submit(balanceJob);
        scheduler.waitUntilDone(balanceJob);
        TestCase.assertTrue((boolean)balanceJob.isJobDone());
        if (balanceJob.getError() != null) {
            throw balanceJob.getError();
        }
        Assert.assertNull((Object)balanceJob.getError());
        TestCase.assertTrue((boolean)fs.exists(dst));
        Assert.assertFalse((boolean)fs.exists(new Path(context.getSrc(), ".snapshot")));
        Assert.assertFalse((boolean)fs.exists(new Path(context.getDst(), ".snapshot")));
        Assert.assertEquals((Object)originalPerm, (Object)fs.getFileStatus(dst).getPermission());
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(src).getPermission().toShort());
        for (FileEntry e : this.srcfiles) {
            if (e.isDir) continue;
            Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT));
            Assert.assertEquals((long)102400L, (long)fs.getFileStatus(targetFile).getLen());
        }
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testInitDistCp() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        fs.setPermission(src, FsPermission.createImmutable((short)16));
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        try {
            dcProcedure.initDistCp();
        }
        catch (BalanceProcedure.RetryException retryException) {
            // empty catch block
        }
        fs.delete(new Path(src, "a"), true);
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> dcProcedure.initDistCp());
        TestCase.assertTrue((boolean)fs.exists(dst));
        TestCase.assertTrue((boolean)fs.exists(new Path(dst, "a")));
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testDiffThreshold() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT, 10);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> dcProcedure.initDistCp());
        Path lastPath = new Path(src, "a");
        for (int i = 0; i < 5; ++i) {
            Path newPath = new Path(src, "a-" + i);
            fs.rename(lastPath, newPath);
            lastPath = newPath;
            TestCase.assertTrue((boolean)dcProcedure.diffDistCpStageDone());
            TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.DISABLE_WRITE, () -> dcProcedure.diffDistCp());
        }
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testDiffDistCp() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> dcProcedure.initDistCp());
        TestCase.assertTrue((boolean)fs.exists(dst));
        fs.rename(new Path(src, "a"), new Path("/a"));
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.FINISH, () -> dcProcedure.finalDistCp());
        Assert.assertFalse((boolean)fs.exists(new Path(dst, "a")));
        fs.rename(new Path("/a"), new Path(src, "a"));
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.FINISH, () -> dcProcedure.finalDistCp());
        TestCase.assertTrue((boolean)fs.exists(new Path(dst, "a")));
        FSDataOutputStream out = fs.append(new Path(src, "a"));
        out.write("hello".getBytes());
        out.close();
        long len = fs.getFileStatus(new Path(src, "a")).getLen();
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.FINISH, () -> dcProcedure.finalDistCp());
        Assert.assertEquals((long)len, (long)fs.getFileStatus(new Path(dst, "a")).getLen());
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testStageFinalDistCp() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FSDataOutputStream out = fs.append(new Path(src, "a"));
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> dcProcedure.initDistCp());
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.FINISH, () -> dcProcedure.finalDistCp());
        LambdaTestUtils.intercept(RemoteException.class, (String)"LeaseExpiredException", (String)"Expect RemoteException(LeaseExpiredException).", () -> TestDistCpProcedure.lambda$testStageFinalDistCp$9((OutputStream)out));
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testStageFinish() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        fs.mkdirs(src);
        fs.mkdirs(dst);
        fs.allowSnapshot(src);
        fs.allowSnapshot(dst);
        fs.createSnapshot(src, "DISTCP-BALANCE-CURRENT");
        fs.createSnapshot(src, "DISTCP-BALANCE-NEXT");
        fs.createSnapshot(dst, "DISTCP-BALANCE-CURRENT");
        FsPermission originalPerm = new FsPermission(777);
        fs.setPermission(src, originalPerm);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        dcProcedure.disableWrite(context);
        dcProcedure.finish();
        TestCase.assertTrue((boolean)fs.exists(dst));
        Assert.assertFalse((boolean)fs.exists(new Path(src, ".snapshot")));
        Assert.assertFalse((boolean)fs.exists(new Path(dst, ".snapshot")));
        Assert.assertEquals((Object)originalPerm, (Object)fs.getFileStatus(dst).getPermission());
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(src).getPermission().toShort());
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testRecoveryByStage() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure[] dcp = new DistCpProcedure[]{new DistCpProcedure("distcp-procedure", null, 1000L, context)};
        dcp[0] = this.serializeProcedure(dcp[0]);
        TestDistCpProcedure.executeProcedure(dcp[0], DistCpProcedure.Stage.INIT_DISTCP, () -> dcp[0].preCheck());
        dcp[0] = this.serializeProcedure(dcp[0]);
        TestDistCpProcedure.executeProcedure(dcp[0], DistCpProcedure.Stage.DIFF_DISTCP, () -> dcp[0].initDistCp());
        fs.delete(new Path(src, "a"), true);
        dcp[0] = this.serializeProcedure(dcp[0]);
        TestDistCpProcedure.executeProcedure(dcp[0], DistCpProcedure.Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
        dcp[0] = this.serializeProcedure(dcp[0]);
        TestDistCpProcedure.executeProcedure(dcp[0], DistCpProcedure.Stage.FINAL_DISTCP, () -> dcp[0].disableWrite(context));
        dcp[0] = this.serializeProcedure(dcp[0]);
        FSDataOutputStream out = fs.append(new Path(src, "b/c"));
        TestDistCpProcedure.executeProcedure(dcp[0], DistCpProcedure.Stage.FINISH, () -> dcp[0].finalDistCp());
        LambdaTestUtils.intercept(RemoteException.class, (String)"LeaseExpiredException", (String)"Expect RemoteException(LeaseExpiredException).", () -> TestDistCpProcedure.lambda$testRecoveryByStage$15((OutputStream)out));
        dcp[0] = this.serializeProcedure(dcp[0]);
        TestCase.assertTrue((boolean)dcp[0].execute());
        TestCase.assertTrue((boolean)fs.exists(dst));
        Assert.assertFalse((boolean)fs.exists(new Path(context.getSrc(), ".snapshot")));
        Assert.assertFalse((boolean)fs.exists(new Path(context.getDst(), ".snapshot")));
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testShutdown() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
        scheduler.init(true);
        BalanceJob balanceJob = new BalanceJob.Builder().nextProcedure((BalanceProcedure)dcProcedure).build();
        scheduler.submit(balanceJob);
        long sleep = Math.abs(new Random().nextLong()) % 10000L;
        Thread.sleep(sleep);
        scheduler.shutDown();
        this.cleanup(fs, new Path(testRoot));
    }

    @Test
    public void testDisableWrite() throws Exception {
        String testRoot = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get((URI)URI.create(nnUri), (Configuration)conf);
        this.createFiles(fs, testRoot, this.srcfiles);
        Path src = new Path(testRoot, SRCDAT);
        Path dst = new Path(testRoot, DSTDAT);
        FedBalanceContext context = this.buildContext(src, dst, MOUNT);
        DistCpProcedure dcProcedure = new DistCpProcedure("distcp-procedure", null, 1000L, context);
        Assert.assertNotEquals((long)0L, (long)fs.getFileStatus(src).getPermission().toShort());
        TestDistCpProcedure.executeProcedure(dcProcedure, DistCpProcedure.Stage.FINAL_DISTCP, () -> dcProcedure.disableWrite(context));
        Assert.assertEquals((long)0L, (long)fs.getFileStatus(src).getPermission().toShort());
        this.cleanup(fs, new Path(testRoot));
    }

    private FedBalanceContext buildContext(Path src, Path dst, String mount) {
        return this.buildContext(src, dst, mount, 0);
    }

    private FedBalanceContext buildContext(Path src, Path dst, String mount, int diffThreshold) {
        return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10).setBandwidthLimit(1).setTrash(FedBalanceConfigs.TrashOption.TRASH).setDelayDuration(1000L).setDiffThreshold(diffThreshold).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static void executeProcedure(DistCpProcedure procedure, DistCpProcedure.Stage target, Call call) throws IOException {
        DistCpProcedure.Stage stage = DistCpProcedure.Stage.PRE_CHECK;
        procedure.updateStage(stage);
        while (stage != target) {
            try {
                call.execute();
            }
            catch (BalanceProcedure.RetryException retryException) {}
            continue;
            finally {
                stage = procedure.getStage();
            }
        }
    }

    private void createFiles(DistributedFileSystem fs, String topdir, FileEntry[] entries) throws IOException {
        long seed = System.currentTimeMillis();
        Random rand = new Random(seed);
        short replicationFactor = 2;
        for (FileEntry entry : entries) {
            Path newPath = new Path(topdir + "/" + entry.getPath());
            if (entry.isDirectory()) {
                fs.mkdirs(newPath);
            } else {
                int bufSize = 128;
                DFSTestUtil.createFile((FileSystem)fs, (Path)newPath, (int)bufSize, (long)102400L, (long)1024L, (short)replicationFactor, (long)seed);
            }
            seed = System.currentTimeMillis() + rand.nextLong();
        }
    }

    private DistCpProcedure serializeProcedure(DistCpProcedure dcp) throws IOException {
        ByteArrayOutputStream bao = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(bao);
        dcp.write((DataOutput)dataOut);
        dcp = new DistCpProcedure();
        dcp.readFields((DataInput)new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
        return dcp;
    }

    private void cleanup(DistributedFileSystem dfs, Path root) throws IOException {
        Path src = new Path(root, SRCDAT);
        Path dst = new Path(root, DSTDAT);
        DistCpProcedure.cleanupSnapshot((DistributedFileSystem)dfs, (Path)src);
        DistCpProcedure.cleanupSnapshot((DistributedFileSystem)dfs, (Path)dst);
        dfs.delete(root, true);
    }

    private static /* synthetic */ void lambda$testRecoveryByStage$15(OutputStream out) throws Exception {
        out.close();
    }

    private static /* synthetic */ void lambda$testStageFinalDistCp$9(OutputStream out) throws Exception {
        out.close();
    }

    static class FileEntry {
        private String path;
        private boolean isDir;

        FileEntry(String path, boolean isDir) {
            this.path = path;
            this.isDir = isDir;
        }

        String getPath() {
            return this.path;
        }

        boolean isDirectory() {
            return this.isDir;
        }
    }

    protected static interface Call {
        public void execute() throws IOException, BalanceProcedure.RetryException;
    }
}

