/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.work.batch;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import oadd.com.google.common.base.Joiner;
import oadd.com.google.common.base.Preconditions;
import oadd.com.google.common.base.Stopwatch;
import oadd.com.google.common.collect.Queues;
import oadd.io.netty.buffer.ByteBuf;
import oadd.io.netty.buffer.DrillBuf;
import oadd.org.apache.drill.exec.memory.BufferAllocator;
import oadd.org.apache.drill.exec.ops.FragmentContext;
import oadd.org.apache.drill.exec.proto.BitData;
import oadd.org.apache.drill.exec.proto.ExecProtos;
import oadd.org.apache.drill.exec.proto.helper.QueryIdHelper;
import oadd.org.apache.drill.exec.record.RawFragmentBatch;
import oadd.org.apache.drill.exec.store.LocalSyncableFileSystem;
import oadd.org.apache.drill.exec.work.batch.BaseRawBatchBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpoolingRawBatchBuffer
extends BaseRawBatchBuffer<RawFragmentBatchWrapper> {
    static final Logger logger = LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
    private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
    private static final float STOP_SPOOLING_FRACTION = 0.5f;
    public static final long ALLOCATOR_INITIAL_RESERVATION = 0x100000L;
    public static final long ALLOCATOR_MAX_RESERVATION = 20000000000L;
    private final BufferAllocator allocator;
    private final long threshold;
    private final int oppositeId;
    private final int bufferIndex;
    private volatile SpoolingState spoolingState;
    private volatile long currentSizeInMemory = 0L;
    private volatile Spooler spooler;
    private FileSystem fs;
    private Path path;
    private FSDataOutputStream outputStream;

    public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) {
        super(context, fragmentCount);
        this.allocator = context.getNewChildAllocator("SpoolingRawBatchBufer", 100, 0x100000L, 20000000000L);
        this.threshold = context.getConfig().getLong("drill.exec.buffer.spooling.size");
        this.oppositeId = oppositeId;
        this.bufferIndex = bufferIndex;
        this.bufferQueue = new SpoolingBufferQueue();
    }

    private synchronized void setSpoolingState(SpoolingState newState) {
        SpoolingState currentState = this.spoolingState;
        if (newState == SpoolingState.NOT_SPOOLING || currentState == SpoolingState.STOP_SPOOLING) {
            return;
        }
        this.spoolingState = newState;
    }

    private boolean isCurrentlySpooling() {
        return this.spoolingState == SpoolingState.SPOOLING;
    }

    private void startSpooling() {
        this.setSpoolingState(SpoolingState.SPOOLING);
    }

    private void pauseSpooling() {
        this.setSpoolingState(SpoolingState.PAUSE_SPOOLING);
    }

    private boolean isSpoolingStopped() {
        return this.spoolingState == SpoolingState.STOP_SPOOLING;
    }

    private void stopSpooling() {
        this.setSpoolingState(SpoolingState.STOP_SPOOLING);
    }

    public String getDir() {
        List dirs = this.context.getConfig().getStringList("drill.exec.tmp.directories");
        return (String)dirs.get(ThreadLocalRandom.current().nextInt(dirs.size()));
    }

    private synchronized void initSpooler() throws IOException {
        if (this.spooler != null) {
            return;
        }
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", this.context.getConfig().getString("drill.exec.tmp.filesystem"));
        conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
        this.fs = FileSystem.get((Configuration)conf);
        this.path = this.getPath();
        this.outputStream = this.fs.create(this.path);
        String spoolingThreadName = QueryIdHelper.getExecutorThreadName(this.context.getHandle()).concat(":Spooler-" + this.oppositeId + "-" + this.bufferIndex);
        this.spooler = new Spooler(spoolingThreadName);
        this.spooler.start();
    }

    @Override
    protected void enqueueInner(RawFragmentBatch batch) throws IOException {
        assert (batch.getHeader().getSendingMajorFragmentId() == this.oppositeId);
        logger.debug("Enqueue batch. Current buffer size: {}. Last batch: {}. Sending fragment: {}", this.bufferQueue.size(), batch.getHeader().getIsLastBatch(), batch.getHeader().getSendingMajorFragmentId());
        boolean spoolCurrentBatch = this.isCurrentlySpooling();
        RawFragmentBatchWrapper wrapper = new RawFragmentBatchWrapper(batch, !spoolCurrentBatch);
        this.currentSizeInMemory += wrapper.getBodySize();
        if (spoolCurrentBatch) {
            if (this.spooler == null) {
                this.initSpooler();
            }
            this.spooler.addBatchForSpooling(wrapper);
        }
        this.bufferQueue.add(wrapper);
        if (!spoolCurrentBatch && this.currentSizeInMemory > this.threshold) {
            logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", (Object)this.currentSizeInMemory, (Object)this.threshold);
            this.startSpooling();
        }
    }

    @Override
    public void kill(FragmentContext context) {
        this.allocator.close();
        if (this.spooler != null) {
            this.spooler.terminate();
        }
    }

    @Override
    protected void upkeep(RawFragmentBatch batch) {
        DrillBuf body;
        if (this.context.isOverMemoryLimit()) {
            this.outOfMemory.set(true);
        }
        if ((body = batch.getBody()) != null) {
            this.currentSizeInMemory -= (long)body.capacity();
        }
        if (this.isCurrentlySpooling() && (float)this.currentSizeInMemory < (float)this.threshold * 0.5f) {
            logger.debug("buffer size {} less than {}x threshold. Stop spooling.", (Object)this.currentSizeInMemory, (Object)Float.valueOf(0.5f));
            this.pauseSpooling();
        }
        logger.debug("Got batch. Current buffer size: {}", (Object)this.bufferQueue.size());
    }

    @Override
    public void close() {
        if (this.spooler != null) {
            this.spooler.terminate();
            while (this.spooler.isAlive()) {
                try {
                    this.spooler.join();
                }
                catch (InterruptedException e) {
                    logger.warn("Interrupted while waiting for spooling thread to exit");
                }
            }
        }
        this.allocator.close();
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
            }
        }
        catch (IOException e) {
            logger.warn("Failed to cleanup I/O streams", e);
        }
        if (this.context.getConfig().getBoolean("drill.exec.buffer.spooling.delete")) {
            try {
                if (this.fs != null) {
                    this.fs.delete(this.path, false);
                    logger.debug("Deleted file {}", (Object)this.path.toString());
                }
            }
            catch (IOException e) {
                logger.warn("Failed to delete temporary files", e);
            }
        }
        super.close();
    }

    private Path getPath() {
        ExecProtos.FragmentHandle handle = this.context.getHandle();
        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
        int majorFragmentId = handle.getMajorFragmentId();
        int minorFragmentId = handle.getMinorFragmentId();
        String fileName = Joiner.on("/").join(this.getDir(), qid, majorFragmentId, minorFragmentId, this.oppositeId, this.bufferIndex);
        return new Path(fileName);
    }

    class RawFragmentBatchWrapper {
        private RawFragmentBatch batch;
        private volatile boolean available;
        private CountDownLatch latch;
        private volatile int bodyLength;
        private volatile boolean outOfMemory = false;
        private long start = -1L;
        private long check;

        public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
            Preconditions.checkNotNull(batch);
            this.batch = batch;
            this.available = available;
            this.latch = new CountDownLatch(available ? 0 : 1);
            if (available) {
                batch.sendOk();
            }
        }

        public boolean isNull() {
            return this.batch == null;
        }

        public RawFragmentBatch get() throws InterruptedException, IOException {
            if (this.available) {
                assert (this.batch.getHeader() != null) : "batch header null";
                return this.batch;
            }
            this.latch.await();
            this.readFromStream();
            this.available = true;
            return this.batch;
        }

        public long getBodySize() {
            if (this.batch.getBody() == null) {
                return 0L;
            }
            assert (this.batch.getBody().readableBytes() >= 0);
            return this.batch.getBody().readableBytes();
        }

        public void writeToStream(FSDataOutputStream stream) throws IOException {
            Stopwatch watch = Stopwatch.createStarted();
            this.available = false;
            this.check = ThreadLocalRandom.current().nextLong();
            this.start = stream.getPos();
            logger.debug("Writing check value {} at position {}", (Object)this.check, (Object)this.start);
            stream.writeLong(this.check);
            this.batch.getHeader().writeDelimitedTo((OutputStream)stream);
            DrillBuf buf = this.batch.getBody();
            this.bodyLength = buf != null ? ((ByteBuf)buf).capacity() : 0;
            if (this.bodyLength > 0) {
                ((ByteBuf)buf).getBytes(0, (OutputStream)stream, this.bodyLength);
            }
            stream.hsync();
            FileStatus status = SpoolingRawBatchBuffer.this.fs.getFileStatus(SpoolingRawBatchBuffer.this.path);
            long len = status.getLen();
            logger.debug("After spooling batch, stream at position {}. File length {}", (Object)stream.getPos(), (Object)len);
            this.batch.sendOk();
            this.latch.countDown();
            long t = watch.elapsed(TimeUnit.MICROSECONDS);
            logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, this.bodyLength, (long)this.bodyLength / t);
            if (buf != null) {
                buf.release();
            }
        }

        public void readFromStream() throws IOException, InterruptedException {
            long pos = this.start;
            boolean tryAgain = true;
            int duration = 0;
            while (tryAgain) {
                Thread.sleep(duration);
                try {
                    FSDataInputStream stream = SpoolingRawBatchBuffer.this.fs.open(SpoolingRawBatchBuffer.this.path);
                    Throwable throwable = null;
                    try {
                        DrillBuf buf = SpoolingRawBatchBuffer.this.allocator.buffer(this.bodyLength);
                        Throwable throwable2 = null;
                        try {
                            stream.seek(this.start);
                            long currentPos = stream.getPos();
                            long check = stream.readLong();
                            pos = stream.getPos();
                            assert (check == this.check) : String.format("Check values don't match: %d %d, Position %d", this.check, check, currentPos);
                            Stopwatch watch = Stopwatch.createStarted();
                            BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom((InputStream)stream);
                            pos = stream.getPos();
                            assert (header != null) : "header null after parsing from stream";
                            buf.writeBytes((InputStream)stream, this.bodyLength);
                            pos = stream.getPos();
                            this.batch = new RawFragmentBatch(header, buf, null);
                            this.available = true;
                            this.latch.countDown();
                            long t = watch.elapsed(TimeUnit.MICROSECONDS);
                            logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, this.bodyLength, (long)this.bodyLength / t);
                            tryAgain = false;
                        }
                        catch (Throwable throwable3) {
                            throwable2 = throwable3;
                            throw throwable3;
                        }
                        finally {
                            if (buf == null) continue;
                            if (throwable2 != null) {
                                try {
                                    buf.close();
                                }
                                catch (Throwable x2) {
                                    throwable2.addSuppressed(x2);
                                }
                                continue;
                            }
                            buf.close();
                        }
                    }
                    catch (Throwable throwable4) {
                        throwable = throwable4;
                        throw throwable4;
                    }
                    finally {
                        if (stream == null) continue;
                        if (throwable != null) {
                            try {
                                stream.close();
                            }
                            catch (Throwable x2) {
                                throwable.addSuppressed(x2);
                            }
                            continue;
                        }
                        stream.close();
                    }
                }
                catch (EOFException e) {
                    FileStatus status = SpoolingRawBatchBuffer.this.fs.getFileStatus(SpoolingRawBatchBuffer.this.path);
                    logger.warn("EOF reading from file {} at pos {}. Current file size: {}", SpoolingRawBatchBuffer.this.path, pos, status.getLen());
                    duration = Math.max(1, duration * 2);
                    if (duration < 60000) continue;
                    throw e;
                }
                finally {
                    if (!tryAgain || this.batch == null) continue;
                    this.batch.getBody().release();
                }
            }
        }

        private boolean isOutOfMemory() {
            return this.outOfMemory;
        }

        private void setOutOfMemory(boolean outOfMemory) {
            this.outOfMemory = outOfMemory;
        }
    }

    private class Spooler
    extends Thread {
        private final LinkedBlockingDeque<RawFragmentBatchWrapper> spoolingQueue;
        private volatile boolean shouldContinue = true;
        private Thread spoolingThread;

        public Spooler(String name) {
            this.setDaemon(true);
            this.setName(name);
            this.spoolingQueue = Queues.newLinkedBlockingDeque();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block12: {
                block9: while (true) {
                    try {
                        while (this.shouldContinue) {
                            RawFragmentBatchWrapper batch;
                            try {
                                batch = this.spoolingQueue.take();
                            }
                            catch (InterruptedException e) {
                                if (this.shouldContinue) {
                                    continue;
                                }
                                break block12;
                            }
                            try {
                                batch.writeToStream(SpoolingRawBatchBuffer.this.outputStream);
                                continue block9;
                            }
                            catch (IOException e) {
                                SpoolingRawBatchBuffer.this.context.fail(e);
                            }
                        }
                        break block12;
                    }
                    catch (Throwable e) {
                        SpoolingRawBatchBuffer.this.context.fail(e);
                        break block12;
                    }
                }
                finally {
                    logger.info("Spooler thread exiting");
                }
            }
        }

        public void addBatchForSpooling(RawFragmentBatchWrapper batchWrapper) {
            if (SpoolingRawBatchBuffer.this.isSpoolingStopped()) {
                this.spoolingQueue.add(batchWrapper);
            } else {
                batchWrapper.available = true;
                batchWrapper.batch.sendOk();
                batchWrapper.latch.countDown();
            }
        }

        public void terminate() {
            SpoolingRawBatchBuffer.this.stopSpooling();
            this.shouldContinue = false;
            if (this.spoolingThread.isAlive()) {
                this.spoolingThread.interrupt();
            }
        }
    }

    private class SpoolingBufferQueue
    implements BaseRawBatchBuffer.BufferQueue<RawFragmentBatchWrapper> {
        private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();

        private SpoolingBufferQueue() {
        }

        @Override
        public void addOomBatch(RawFragmentBatch batch) {
            RawFragmentBatchWrapper batchWrapper = new RawFragmentBatchWrapper(batch, true);
            batchWrapper.setOutOfMemory(true);
            this.buffer.addFirst(batchWrapper);
        }

        @Override
        public RawFragmentBatch poll() throws IOException {
            RawFragmentBatchWrapper batchWrapper = this.buffer.poll();
            if (batchWrapper != null) {
                try {
                    return batchWrapper.get();
                }
                catch (InterruptedException e) {
                    return null;
                }
            }
            return null;
        }

        @Override
        public RawFragmentBatch take() throws IOException, InterruptedException {
            return this.buffer.take().get();
        }

        @Override
        public boolean checkForOutOfMemory() {
            return this.buffer.peek().isOutOfMemory();
        }

        @Override
        public int size() {
            return this.buffer.size();
        }

        @Override
        public boolean isEmpty() {
            return this.buffer.size() == 0;
        }

        @Override
        public void add(RawFragmentBatchWrapper batchWrapper) {
            this.buffer.add(batchWrapper);
        }
    }

    private static enum SpoolingState {
        NOT_SPOOLING,
        SPOOLING,
        PAUSE_SPOOLING,
        STOP_SPOOLING;

    }
}

