/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.benchmark;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.FileSystems;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FileRangeImpl;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(value={Mode.AverageTime})
@OutputTimeUnit(value=TimeUnit.MICROSECONDS)
public class VectoredReadBenchmark {
    static final Path DATA_PATH = VectoredReadBenchmark.getTestDataPath();
    static final String DATA_PATH_PROPERTY = "bench.data";
    static final int READ_SIZE = 65536;
    static final long SEEK_SIZE = 0x100000L;

    static Path getTestDataPath() {
        String value = System.getProperty(DATA_PATH_PROPERTY);
        return new Path(value == null ? "/tmp/taxi.orc" : value);
    }

    @Benchmark
    public void asyncRead(FileSystemChoice fsChoice, BufferChoice bufferChoice, Blackhole blackhole) throws Exception {
        FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
        ArrayList<FileRange> ranges = new ArrayList<FileRange>();
        for (int m = 0; m < 100; ++m) {
            FileRange range = FileRange.createFileRange((long)((long)m * 0x100000L), (int)65536);
            ranges.add(range);
        }
        stream.readVectored(ranges, bufferChoice.allocate);
        for (FileRange range : ranges) {
            blackhole.consume(range.getData().get());
        }
        stream.close();
    }

    @Benchmark
    public void asyncFileChanArray(BufferChoice bufferChoice, Blackhole blackhole) throws Exception {
        java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString(), new String[0]);
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ArrayList<FileRangeCallback> ranges = new ArrayList<FileRangeCallback>();
        Joiner joiner = new Joiner(100);
        for (int m = 0; m < 100; ++m) {
            ByteBuffer buffer = (ByteBuffer)bufferChoice.allocate.apply(65536);
            FileRangeCallback range = new FileRangeCallback(channel, (long)m * 0x100000L, 65536, joiner, buffer);
            ranges.add(range);
            channel.read(buffer, range.getOffset(), range, range);
        }
        joiner.join();
        channel.close();
        blackhole.consume(ranges);
    }

    @Benchmark
    public void syncRead(FileSystemChoice fsChoice, Blackhole blackhole) throws Exception {
        FSDataInputStream stream = fsChoice.fs.open(DATA_PATH);
        ArrayList<byte[]> result = new ArrayList<byte[]>();
        for (int m = 0; m < 100; ++m) {
            byte[] buffer = new byte[65536];
            stream.readFully((long)m * 0x100000L, buffer);
            result.add(buffer);
        }
        blackhole.consume(result);
        stream.close();
    }

    public static void main(String[] args) throws Exception {
        OptionsBuilder opts = new OptionsBuilder();
        opts.include("VectoredReadBenchmark");
        opts.jvmArgs(new String[]{"-server", "-Xms256m", "-Xmx2g", "-Dbench.data=" + args[0]});
        opts.forks(1);
        new Runner(opts.build()).run();
    }

    static class FileRangeCallback
    extends FileRangeImpl
    implements CompletionHandler<Integer, FileRangeCallback> {
        private final AsynchronousFileChannel channel;
        private final ByteBuffer buffer;
        private int completed = 0;
        private final Joiner joiner;

        FileRangeCallback(AsynchronousFileChannel channel, long offset, int length, Joiner joiner, ByteBuffer buffer) {
            super(offset, length, null);
            this.channel = channel;
            this.joiner = joiner;
            this.buffer = buffer;
        }

        @Override
        public void completed(Integer result, FileRangeCallback attachment) {
            int bytes = result;
            if (bytes == -1) {
                this.failed((Throwable)new EOFException("Read past end of file"), this);
            }
            this.completed += bytes;
            if (this.completed < this.getLength()) {
                this.channel.read(this.buffer, this.getOffset() + (long)this.completed, this, this);
            } else {
                this.buffer.flip();
                this.joiner.finish();
            }
        }

        @Override
        public void failed(Throwable exc, FileRangeCallback attachment) {
            this.joiner.failed(exc, (FileRange)this);
        }
    }

    static class Joiner
    implements CompletionHandler<ByteBuffer, FileRange> {
        private int remaining;
        private final ByteBuffer[] result;
        private Throwable exception = null;

        Joiner(int total) {
            this.remaining = total;
            this.result = new ByteBuffer[total];
        }

        synchronized void finish() {
            --this.remaining;
            if (this.remaining == 0) {
                this.notify();
            }
        }

        synchronized ByteBuffer[] join() throws InterruptedException, IOException {
            while (this.remaining > 0 && this.exception == null) {
                this.wait();
            }
            if (this.exception != null) {
                throw new IOException("problem reading", this.exception);
            }
            return this.result;
        }

        @Override
        public synchronized void completed(ByteBuffer buffer, FileRange attachment) {
            this.result[--this.remaining] = buffer;
            if (this.remaining == 0) {
                this.notify();
            }
        }

        @Override
        public synchronized void failed(Throwable exc, FileRange attachment) {
            this.exception = exc;
            this.notify();
        }
    }

    @State(value=Scope.Thread)
    public static class BufferChoice {
        @Param(value={"direct", "array"})
        private String bufferKind;
        private IntFunction<ByteBuffer> allocate;

        @Setup(value=Level.Trial)
        public void setup() {
            this.allocate = "array".equals(this.bufferKind) ? ByteBuffer::allocate : ByteBuffer::allocateDirect;
        }
    }

    @State(value=Scope.Thread)
    public static class FileSystemChoice {
        @Param(value={"local", "raw"})
        private String fileSystemKind;
        private Configuration conf;
        private FileSystem fs;

        @Setup(value=Level.Trial)
        public void setup() {
            this.conf = new Configuration();
            try {
                LocalFileSystem local = FileSystem.getLocal((Configuration)this.conf);
                this.fs = "raw".equals(this.fileSystemKind) ? local.getRaw() : local;
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Can't get filesystem", e);
            }
        }
    }
}

