/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;

public class TestMultiThreadedHflush {
    static final int blockSize = 0x100000;
    private static final int NUM_THREADS = 10;
    private static final int WRITE_SIZE = 517;
    private static final int NUM_WRITES_PER_THREAD = 1000;
    private byte[] toWrite = null;
    private final SampleQuantiles quantiles = new SampleQuantiles(new Quantile[]{new Quantile(0.5, 0.05), new Quantile(0.75, 0.025), new Quantile(0.9, 0.01), new Quantile(0.95, 0.005), new Quantile(0.99, 0.001)});

    private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 0x100000L);
        return stm;
    }

    private void initBuffer(int size) {
        long seed = AppendTestUtil.nextLong();
        this.toWrite = AppendTestUtil.randomBytes(seed, size);
    }

    @Test
    public void testMultipleHflushersRepl1() throws Exception {
        this.doTestMultipleHflushers(1);
    }

    @Test
    public void testMultipleHflushersRepl3() throws Exception {
        this.doTestMultipleHflushers(3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTestMultipleHflushers(int repl) throws Exception {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(repl).build();
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/multiple-hflushers.dat");
        try {
            this.doMultithreadedWrites(conf, p, 10, 517, 1000, repl);
            System.out.println("Latency quantiles (in microseconds):\n" + this.quantiles);
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testHflushWhileClosing() throws Throwable {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/hflush-and-close.dat");
        final FSDataOutputStream stm = this.createFile(fs, p, 1);
        ArrayList<1> flushers = new ArrayList<1>();
        final AtomicReference thrown = new AtomicReference();
        try {
            int i;
            for (i = 0; i < 10; ++i) {
                Thread thread = new Thread(){

                    @Override
                    public void run() {
                        try {
                            try {
                                while (true) {
                                    stm.hflush();
                                }
                            }
                            catch (ClosedChannelException ioe) {
                                return;
                            }
                        }
                        catch (Throwable t) {
                            thrown.set(t);
                            return;
                        }
                    }
                };
                thread.start();
                flushers.add(thread);
            }
            for (i = 0; i < 10000; ++i) {
                stm.write(1);
            }
            stm.close();
            for (Thread thread : flushers) {
                thread.join();
            }
            if (thrown.get() != null) {
                throw (Throwable)thrown.get();
            }
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    public void doMultithreadedWrites(Configuration conf, Path p, int numThreads, int bufferSize, int numWrites, int replication) throws Exception {
        this.initBuffer(bufferSize);
        FileSystem fs = p.getFileSystem(conf);
        FSDataOutputStream stm = this.createFile(fs, p, replication);
        System.out.println("Created file simpleFlush.dat");
        stm.hflush();
        stm.hflush();
        stm.write(1);
        stm.hflush();
        stm.hflush();
        CountDownLatch countdown = new CountDownLatch(1);
        ArrayList<WriterThread> threads = new ArrayList<WriterThread>();
        AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
        for (int i = 0; i < numThreads; ++i) {
            WriterThread writerThread = new WriterThread(stm, thrown, countdown, numWrites);
            threads.add(writerThread);
            writerThread.start();
        }
        countdown.countDown();
        for (Thread thread : threads) {
            thread.join();
        }
        if (thrown.get() != null) {
            throw new RuntimeException("Deferred", thrown.get());
        }
        stm.close();
        System.out.println("Closed file.");
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run((Tool)new CLIBenchmark(), (String[])args));
    }

    private static class CLIBenchmark
    extends Configured
    implements Tool {
        private CLIBenchmark() {
        }

        public int run(String[] args) throws Exception {
            if (args.length != 1) {
                System.err.println("usage: " + TestMultiThreadedHflush.class.getSimpleName() + " <path to test file> ");
                System.err.println("Configurations settable by -D options:\n  num.threads [default 10] - how many threads to run\n  write.size [default 511] - bytes per write\n  num.writes [default 50000] - how many writes to perform");
                System.exit(1);
            }
            TestMultiThreadedHflush test = new TestMultiThreadedHflush();
            Configuration conf = this.getConf();
            Path p = new Path(args[0]);
            int numThreads = conf.getInt("num.threads", 10);
            int writeSize = conf.getInt("write.size", 511);
            int numWrites = conf.getInt("num.writes", 50000);
            int replication = conf.getInt("dfs.replication", 3);
            Stopwatch sw = new Stopwatch().start();
            test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites, replication);
            sw.stop();
            System.out.println("Finished in " + sw.elapsedMillis() + "ms");
            System.out.println("Latency quantiles (in microseconds):\n" + test.quantiles);
            return 0;
        }
    }

    private class WriterThread
    extends Thread {
        private final FSDataOutputStream stm;
        private final AtomicReference<Throwable> thrown;
        private final int numWrites;
        private final CountDownLatch countdown;

        public WriterThread(FSDataOutputStream stm, AtomicReference<Throwable> thrown, CountDownLatch countdown, int numWrites) {
            this.stm = stm;
            this.thrown = thrown;
            this.numWrites = numWrites;
            this.countdown = countdown;
        }

        @Override
        public void run() {
            try {
                this.countdown.await();
                for (int i = 0; i < this.numWrites && this.thrown.get() == null; ++i) {
                    this.doAWrite();
                }
            }
            catch (Throwable t) {
                this.thrown.compareAndSet(null, t);
            }
        }

        private void doAWrite() throws IOException {
            Stopwatch sw = new Stopwatch().start();
            this.stm.write(TestMultiThreadedHflush.this.toWrite);
            this.stm.hflush();
            long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
            TestMultiThreadedHflush.this.quantiles.insert(micros);
        }
    }
}

