/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.Test;

public class TestMultiThreadedHflush {
    static final int blockSize = 0x100000;
    static final int numBlocks = 10;
    static final int fileSize = 0xA00001;
    private static final int NUM_THREADS = 10;
    private static final int WRITE_SIZE = 517;
    private static final int NUM_WRITES_PER_THREAD = 1000;
    private byte[] toWrite = null;

    public TestMultiThreadedHflush() {
        ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
    }

    private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 0x100000L);
        return stm;
    }

    private void initBuffer(int size) {
        long seed = AppendTestUtil.nextLong();
        this.toWrite = AppendTestUtil.randomBytes(seed, size);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleHflushers() throws Exception {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/multiple-hflushers.dat");
        try {
            this.doMultithreadedWrites(conf, p, 10, 517, 1000);
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHflushWhileClosing() throws Throwable {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/hflush-and-close.dat");
        final FSDataOutputStream stm = this.createFile(fs, p, 1);
        ArrayList<1> flushers = new ArrayList<1>();
        final AtomicReference thrown = new AtomicReference();
        try {
            int i;
            for (i = 0; i < 10; ++i) {
                Thread thread = new Thread(){

                    @Override
                    public void run() {
                        try {
                            try {
                                while (true) {
                                    stm.hflush();
                                }
                            }
                            catch (IOException ioe) {
                                if (!ioe.toString().contains("DFSOutputStream is closed")) {
                                    throw ioe;
                                }
                                return;
                            }
                        }
                        catch (Throwable t) {
                            thrown.set(t);
                            return;
                        }
                    }
                };
                thread.start();
                flushers.add(thread);
            }
            for (i = 0; i < 10000; ++i) {
                stm.write(1);
            }
            stm.close();
            for (Thread thread : flushers) {
                thread.join();
            }
            if (thrown.get() != null) {
                throw (Throwable)thrown.get();
            }
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    public void doMultithreadedWrites(Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) throws Exception {
        this.initBuffer(bufferSize);
        FileSystem fs = p.getFileSystem(conf);
        FSDataOutputStream stm = this.createFile(fs, p, 1);
        System.out.println("Created file simpleFlush.dat");
        stm.hflush();
        stm.hflush();
        stm.write(1);
        stm.hflush();
        stm.hflush();
        CountDownLatch countdown = new CountDownLatch(1);
        ArrayList<WriterThread> threads = new ArrayList<WriterThread>();
        AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
        for (int i = 0; i < numThreads; ++i) {
            WriterThread writerThread = new WriterThread(stm, thrown, countdown, numWrites);
            threads.add(writerThread);
            writerThread.start();
        }
        countdown.countDown();
        for (Thread thread : threads) {
            thread.join();
        }
        if (thrown.get() != null) {
            throw new RuntimeException("Deferred", thrown.get());
        }
        stm.close();
        System.out.println("Closed file.");
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("usage: " + TestMultiThreadedHflush.class.getSimpleName() + " <path to test file> ");
            System.exit(1);
        }
        TestMultiThreadedHflush test = new TestMultiThreadedHflush();
        Configuration conf = new Configuration();
        Path p = new Path(args[0]);
        long st = System.nanoTime();
        test.doMultithreadedWrites(conf, p, 10, 511, 50000);
        long et = System.nanoTime();
        System.out.println("Finished in " + (et - st) / 1000000L + "ms");
    }

    private class WriterThread
    extends Thread {
        private final FSDataOutputStream stm;
        private final AtomicReference<Throwable> thrown;
        private final int numWrites;
        private final CountDownLatch countdown;

        public WriterThread(FSDataOutputStream stm, AtomicReference<Throwable> thrown, CountDownLatch countdown, int numWrites) {
            this.stm = stm;
            this.thrown = thrown;
            this.numWrites = numWrites;
            this.countdown = countdown;
        }

        @Override
        public void run() {
            try {
                this.countdown.await();
                for (int i = 0; i < this.numWrites && this.thrown.get() == null; ++i) {
                    this.doAWrite();
                }
            }
            catch (Throwable t) {
                this.thrown.compareAndSet(null, t);
            }
        }

        private void doAWrite() throws IOException {
            this.stm.write(TestMultiThreadedHflush.this.toWrite);
            this.stm.hflush();
        }
    }
}

