/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestEditLogRace
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
    static final int NUM_THREADS = 16;
    static final int NUM_ROLLS = 30;
    static final int NUM_SAVE_IMAGE = 30;
    private List<Transactions> workers = new ArrayList<Transactions>();
    private static final int NUM_DATA_NODES = 1;
    private static final int BLOCK_TIME = 10;

    private void startTransactionWorkers(FSNamesystem namesystem, AtomicReference<Throwable> caughtErr) {
        for (int i = 0; i < 16; ++i) {
            Transactions trans = new Transactions(namesystem, caughtErr);
            new Thread((Runnable)trans, "TransactionThread-" + i).start();
            this.workers.add(trans);
        }
    }

    private void stopTransactionWorkers() {
        for (Transactions worker : this.workers) {
            worker.stop();
        }
        for (Transactions worker : this.workers) {
            Thread thr = worker.getThread();
            try {
                if (thr == null) continue;
                thr.join();
            }
            catch (InterruptedException ie) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testEditLogRolling() throws Exception {
        FileSystem fileSys;
        MiniDFSCluster cluster;
        block10: {
            Configuration conf = this.getConf();
            cluster = null;
            fileSys = null;
            AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
            try {
                cluster = new MiniDFSCluster(conf, 1, true, null);
                cluster.waitActive();
                fileSys = cluster.getFileSystem();
                FSNamesystem namesystem = cluster.getNameNode().namesystem;
                FSImage fsimage = namesystem.getFSImage();
                FSEditLog editLog = fsimage.getEditLog();
                FSEditLog.setBufferCapacity((int)2048);
                editLog.close();
                editLog.open();
                this.startTransactionWorkers(namesystem, caughtErr);
                for (int i = 0; i < 30 && caughtErr.get() == null; ++i) {
                    try {
                        Thread.sleep(20L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    LOG.info((Object)("Starting roll " + i + "."));
                    editLog.rollEditLog();
                    LOG.info((Object)("Roll complete " + i + "."));
                    this.verifyEditLogs(fsimage);
                    LOG.info((Object)("Starting purge " + i + "."));
                    editLog.purgeEditLog();
                    LOG.info((Object)("Complete purge " + i + "."));
                }
                this.stopTransactionWorkers();
                if (caughtErr.get() == null) break block10;
            }
            catch (Throwable throwable) {
                this.stopTransactionWorkers();
                if (caughtErr.get() != null) {
                    throw new RuntimeException((Throwable)caughtErr.get());
                }
                if (fileSys != null) {
                    fileSys.close();
                }
                if (cluster != null) {
                    cluster.shutdown();
                }
                throw throwable;
            }
            throw new RuntimeException(caughtErr.get());
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private void verifyEditLogs(FSImage fsimage) throws IOException {
        Iterator it = fsimage.dirIterator((Storage.StorageDirType)FSImage.NameNodeDirType.EDITS);
        while (it.hasNext()) {
            File editFile = FSImage.getImageFile((Storage.StorageDirectory)((Storage.StorageDirectory)it.next()), (FSImage.NameNodeFile)FSImage.NameNodeFile.EDITS);
            System.out.println("Verifying file: " + editFile);
            int numEdits = FSEditLog.loadFSEdits((EditLogInputStream)new FSEditLog.EditLogFileInputStream(editFile));
            System.out.println("Number of edits: " + numEdits);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSaveNamespace() throws Exception {
        FileSystem fileSys;
        MiniDFSCluster cluster;
        block10: {
            Configuration conf = this.getConf();
            cluster = null;
            fileSys = null;
            AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
            try {
                cluster = new MiniDFSCluster(conf, 1, true, null);
                cluster.waitActive();
                fileSys = cluster.getFileSystem();
                FSNamesystem namesystem = FSNamesystem.getFSNamesystem();
                FSImage fsimage = namesystem.getFSImage();
                FSEditLog editLog = fsimage.getEditLog();
                FSEditLog.setBufferCapacity((int)2048);
                editLog.close();
                editLog.open();
                TestEditLogRace.assertTrue((editLog.getEditStreams().size() > 0 ? 1 : 0) != 0);
                this.startTransactionWorkers(namesystem, caughtErr);
                for (int i = 0; i < 30; ++i) {
                    try {
                        Thread.sleep(20L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    LOG.info((Object)("Save " + i + ": entering safe mode"));
                    namesystem.enterSafeMode();
                    this.verifyEditLogs(fsimage);
                    LOG.info((Object)("Save " + i + ": saving namespace"));
                    namesystem.saveNamespace();
                    LOG.info((Object)("Save " + i + ": leaving safemode"));
                    this.verifyEditLogs(fsimage);
                    namesystem.leaveSafeMode(false);
                    LOG.info((Object)("Save " + i + ": complete"));
                }
                this.stopTransactionWorkers();
                if (caughtErr.get() == null) break block10;
            }
            catch (Throwable throwable) {
                this.stopTransactionWorkers();
                if (caughtErr.get() != null) {
                    throw new RuntimeException((Throwable)caughtErr.get());
                }
                if (fileSys != null) {
                    fileSys.close();
                }
                if (cluster != null) {
                    cluster.shutdown();
                }
                throw throwable;
            }
            throw new RuntimeException(caughtErr.get());
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private Configuration getConf() {
        Configuration conf = new Configuration();
        conf.set("dfs.name.dir", MiniDFSCluster.getBaseDir() + "/data");
        conf.setBoolean("dfs.permissions", false);
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSaveImageWhileSyncInProgress() throws Throwable {
        Configuration conf = this.getConf();
        NameNode.format((Configuration)conf);
        NameNode fakeNN = (NameNode)Mockito.mock(NameNode.class);
        NameNode.myMetrics = new NameNodeMetrics(conf, fakeNN);
        ((NameNode)Mockito.doReturn((Object)new InetSocketAddress("127.0.0.1", 12345)).when((Object)fakeNN)).getNameNodeAddress();
        final FSNamesystem namesystem = new FSNamesystem(fakeNN, conf);
        try {
            FSImage fsimage = namesystem.getFSImage();
            FSEditLog editLog = fsimage.getEditLog();
            ArrayList streams = editLog.getEditStreams();
            EditLogOutputStream spyElos = (EditLogOutputStream)Mockito.spy(streams.get(0));
            streams.set(0, spyElos);
            final AtomicReference deferredException = new AtomicReference();
            final CountDownLatch waitToEnterFlush = new CountDownLatch(1);
            final Thread doAnEditThread = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info((Object)"Starting mkdirs");
                        namesystem.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission(493)));
                        LOG.info((Object)"mkdirs complete");
                    }
                    catch (Throwable ioe) {
                        deferredException.set(ioe);
                        waitToEnterFlush.countDown();
                    }
                }
            };
            Answer<Void> blockingFlush = new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    LOG.info((Object)"Flush called");
                    if (Thread.currentThread() == doAnEditThread) {
                        LOG.info((Object)"edit thread: Telling main thread we made it to flush section...");
                        waitToEnterFlush.countDown();
                        LOG.info((Object)"edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        LOG.info((Object)"Going through to flush. This will allow the main thread to continue.");
                    }
                    invocation.callRealMethod();
                    LOG.info((Object)"Flush complete");
                    return null;
                }
            };
            ((EditLogOutputStream)Mockito.doAnswer((Answer)blockingFlush).when((Object)spyElos)).flush();
            doAnEditThread.start();
            LOG.info((Object)"Main thread: waiting to enter flush...");
            waitToEnterFlush.await();
            if (deferredException.get() != null) {
                throw (Throwable)deferredException.get();
            }
            LOG.info((Object)"Main thread: detected that logSync is in unsynchronized section.");
            LOG.info((Object)"Trying to enter safe mode.");
            LOG.info((Object)"This should block for 10sec, since flush will sleep that long");
            long st = System.currentTimeMillis();
            namesystem.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
            long et = System.currentTimeMillis();
            LOG.info((Object)"Entered safe mode");
            TestEditLogRace.assertTrue((et - st > 9000L ? 1 : 0) != 0);
            namesystem.saveNamespace();
            LOG.info((Object)"Joining on edit thread...");
            doAnEditThread.join();
            TestEditLogRace.assertNull(deferredException.get());
            this.verifyEditLogs(fsimage);
        }
        finally {
            LOG.info((Object)"Closing namesystem");
            if (namesystem != null) {
                namesystem.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSaveRightBeforeSync() throws Exception {
        Configuration conf = this.getConf();
        NameNode.format((Configuration)conf);
        NameNode fakeNN = (NameNode)Mockito.mock(NameNode.class);
        NameNode.myMetrics = new NameNodeMetrics(conf, fakeNN);
        ((NameNode)Mockito.doReturn((Object)new InetSocketAddress("127.0.0.1", 12345)).when((Object)fakeNN)).getNameNodeAddress();
        final FSNamesystem namesystem = new FSNamesystem(fakeNN, conf);
        try {
            FSEditLog editLog;
            FSImage fsimage = namesystem.getFSImage();
            fsimage.editLog = editLog = (FSEditLog)Mockito.spy((Object)fsimage.getEditLog());
            final AtomicReference deferredException = new AtomicReference();
            final CountDownLatch waitToEnterSync = new CountDownLatch(1);
            final Thread doAnEditThread = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info((Object)"Starting mkdirs");
                        namesystem.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission(493)));
                        LOG.info((Object)"mkdirs complete");
                    }
                    catch (Throwable ioe) {
                        deferredException.set(ioe);
                        waitToEnterSync.countDown();
                    }
                }
            };
            Answer<Void> blockingSync = new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    LOG.info((Object)"logSync called");
                    if (Thread.currentThread() == doAnEditThread) {
                        LOG.info((Object)"edit thread: Telling main thread we made it just before logSync...");
                        waitToEnterSync.countDown();
                        LOG.info((Object)"edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        LOG.info((Object)"Going through to logSync. This will allow the main thread to continue.");
                    }
                    invocation.callRealMethod();
                    LOG.info((Object)"logSync complete");
                    return null;
                }
            };
            ((FSEditLog)Mockito.doAnswer((Answer)blockingSync).when((Object)editLog)).logSync();
            doAnEditThread.start();
            LOG.info((Object)"Main thread: waiting to just before logSync...");
            waitToEnterSync.await();
            TestEditLogRace.assertNull(deferredException.get());
            LOG.info((Object)"Main thread: detected that logSync about to be called.");
            LOG.info((Object)"Trying to enter safe mode.");
            LOG.info((Object)"This should block for 10sec, since we have pending edits");
            long st = System.currentTimeMillis();
            namesystem.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
            long et = System.currentTimeMillis();
            LOG.info((Object)"Entered safe mode");
            TestEditLogRace.assertTrue((et - st > 9000L ? 1 : 0) != 0);
            namesystem.saveNamespace();
            LOG.info((Object)"Joining on edit thread...");
            doAnEditThread.join();
            TestEditLogRace.assertNull(deferredException.get());
            this.verifyEditLogs(fsimage);
        }
        finally {
            LOG.info((Object)"Closing namesystem");
            if (namesystem != null) {
                namesystem.close();
            }
        }
    }

    static class Transactions
    implements Runnable {
        final FSNamesystem namesystem;
        short replication = (short)3;
        long blockSize = 64L;
        volatile boolean stopped = false;
        volatile Thread thr;
        final AtomicReference<Throwable> caught;

        Transactions(FSNamesystem ns, AtomicReference<Throwable> caught) {
            this.namesystem = ns;
            this.caught = caught;
        }

        @Override
        public void run() {
            this.thr = Thread.currentThread();
            PermissionStatus p = FSNamesystem.getFSNamesystem().createFsOwnerPermissions(new FsPermission(511));
            int i = 0;
            while (!this.stopped) {
                try {
                    String dirname = "/thr-" + this.thr.getId() + "-dir-" + i;
                    this.namesystem.mkdirs(dirname, p);
                    this.namesystem.delete(dirname, true);
                }
                catch (SafeModeException sme) {
                }
                catch (Throwable e) {
                    LOG.warn((Object)"Got error in transaction thread", e);
                    this.caught.compareAndSet(null, e);
                    break;
                }
            }
        }

        public void stop() {
            this.stopped = true;
        }

        public Thread getThread() {
            return this.thr;
        }
    }
}

