/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.nfs.nfs3;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.AsyncDataService;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
import org.apache.hadoop.hdfs.nfs.nfs3.OffsetRange;
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;

class OpenFileCtx {
    public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
    private static long DUMP_WRITE_WATER_MARK = 0x100000L;
    private final DFSClient client;
    private final IdUserGroup iug;
    private volatile boolean activeState;
    private volatile boolean asyncStatus;
    private volatile long asyncWriteBackStartOffset;
    private AtomicLong nextOffset;
    private final HdfsDataOutputStream fos;
    private Nfs3FileAttributes latestAttr;
    private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
    private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
    private long lastAccessTime;
    private volatile boolean enabledDump;
    private FileOutputStream dumpOut;
    private AtomicLong nonSequentialWriteInMemory;
    private RandomAccessFile raf;
    private final String dumpFilePath;
    private Daemon dumpThread;

    private void updateLastAccessTime() {
        this.lastAccessTime = System.currentTimeMillis();
    }

    private boolean checkStreamTimeout(long streamTimeout) {
        return System.currentTimeMillis() - this.lastAccessTime > streamTimeout;
    }

    long getLastAccessTime() {
        return this.lastAccessTime;
    }

    public long getNextOffset() {
        return this.nextOffset.get();
    }

    boolean getActiveState() {
        return this.activeState;
    }

    boolean hasPendingWork() {
        return this.pendingWrites.size() != 0 || this.pendingCommits.size() != 0;
    }

    private long updateNonSequentialWriteInMemory(long count) {
        long newValue = this.nonSequentialWriteInMemory.addAndGet(count);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Update nonSequentialWriteInMemory by " + count + " new value:" + newValue));
        }
        Preconditions.checkState((newValue >= 0L ? 1 : 0) != 0, (Object)("nonSequentialWriteInMemory is negative after update with count " + count));
        return newValue;
    }

    OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdUserGroup iug) {
        this.fos = fos;
        this.latestAttr = latestAttr;
        this.pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(OffsetRange.ReverseComparatorOnMin);
        this.pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
        this.updateLastAccessTime();
        this.activeState = true;
        this.asyncStatus = false;
        this.asyncWriteBackStartOffset = 0L;
        this.dumpOut = null;
        this.raf = null;
        this.nonSequentialWriteInMemory = new AtomicLong(0L);
        this.dumpFilePath = dumpFilePath;
        this.enabledDump = dumpFilePath != null;
        this.nextOffset = new AtomicLong();
        this.nextOffset.set(latestAttr.getSize());
        try {
            assert (this.nextOffset.get() == this.fos.getPos());
        }
        catch (IOException e) {
            // empty catch block
        }
        this.dumpThread = null;
        this.client = client;
        this.iug = iug;
    }

    public Nfs3FileAttributes getLatestAttr() {
        return this.latestAttr;
    }

    private long getFlushedOffset() throws IOException {
        return this.fos.getPos();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkDump() {
        if (!this.enabledDump) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Do nothing, dump is disabled.");
            }
            return;
        }
        if (this.nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
            return;
        }
        OpenFileCtx openFileCtx = this;
        synchronized (openFileCtx) {
            if (this.nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)"Asking dumper to dump...");
                }
                if (this.dumpThread == null) {
                    this.dumpThread = new Daemon((Runnable)new Dumper());
                    this.dumpThread.start();
                } else {
                    this.notifyAll();
                }
            }
        }
    }

    private WriteCtx checkRepeatedWriteRequest(WRITE3Request request, Channel channel, int xid) {
        OffsetRange range = new OffsetRange(request.getOffset(), request.getOffset() + (long)request.getCount());
        WriteCtx writeCtx = (WriteCtx)this.pendingWrites.get(range);
        if (writeCtx == null) {
            return null;
        }
        if (xid != writeCtx.getXid()) {
            LOG.warn((Object)("Got a repeated request, same range, with a different xid:" + xid + " xid in old request:" + writeCtx.getXid()));
        }
        return writeCtx;
    }

    public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) {
        if (!this.activeState) {
            LOG.info((Object)("OpenFileCtx is inactive, fileId:" + request.getHandle().getFileId()));
            WccData fileWcc = new WccData(this.latestAttr.getWccAttr(), this.latestAttr);
            WRITE3Response response = new WRITE3Response(5, fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
            Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
        } else {
            this.updateLastAccessTime();
            WriteCtx existantWriteCtx = this.checkRepeatedWriteRequest(request, channel, xid);
            if (existantWriteCtx != null) {
                if (!existantWriteCtx.getReplied()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Repeated write request which hasn't be served: xid=" + xid + ", drop it."));
                    }
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Repeated write request which is already served: xid=" + xid + ", resend response."));
                    }
                    WccData fileWcc = new WccData(this.latestAttr.getWccAttr(), this.latestAttr);
                    WRITE3Response response = new WRITE3Response(0, fileWcc, request.getCount(), request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
                    Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
                }
            } else {
                this.receivedNewWriteInternal(dfsClient, request, channel, xid, asyncDataService, iug);
            }
        }
    }

    @VisibleForTesting
    public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
        ByteBuffer data;
        long offset = request.getOffset();
        int count = request.getCount();
        long smallerCount = offset + (long)count - cachedOffset;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)String.format("Got overwrite with appended data (%d-%d), current offset %d, drop the overlapped section (%d-%d) and append new data (%d-%d).", offset, offset + (long)count - 1L, cachedOffset, offset, cachedOffset - 1L, cachedOffset, offset + (long)count - 1L));
        }
        Preconditions.checkState(((data = request.getData()).position() == 0 ? 1 : 0) != 0, (Object)"The write request data has non-zero position");
        data.position((int)(cachedOffset - offset));
        Preconditions.checkState(((long)(data.limit() - data.position()) == smallerCount ? 1 : 0) != 0, (Object)"The write request buffer has wrong limit/position regarding count");
        request.setOffset(cachedOffset);
        request.setCount((int)smallerCount);
    }

    private synchronized WriteCtx addWritesToCache(WRITE3Request request, Channel channel, int xid) {
        WriteCtx oldWriteCtx;
        long offset = request.getOffset();
        int count = request.getCount();
        long cachedOffset = this.nextOffset.get();
        int originalCount = -1;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("requesed offset=" + offset + " and current offset=" + cachedOffset));
        }
        if (offset < cachedOffset && offset + (long)count > cachedOffset) {
            LOG.warn((Object)String.format("Got overwrite with appended data (%d-%d), current offset %d, drop the overlapped section (%d-%d) and append new data (%d-%d).", offset, offset + (long)count - 1L, cachedOffset, offset, cachedOffset - 1L, cachedOffset, offset + (long)count - 1L));
            if (!this.pendingWrites.isEmpty()) {
                LOG.warn((Object)"There are other pending writes, fail this jumbo write");
                return null;
            }
            LOG.warn((Object)"Modify this write to write only the appended data");
            OpenFileCtx.alterWriteRequest(request, cachedOffset);
            originalCount = count;
            offset = request.getOffset();
            count = request.getCount();
        }
        if (offset < cachedOffset) {
            LOG.warn((Object)("(offset,count,nextOffset):(" + offset + "," + count + "," + this.nextOffset + ")"));
            return null;
        }
        WriteCtx.DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP;
        WriteCtx writeCtx = new WriteCtx(request.getHandle(), request.getOffset(), request.getCount(), originalCount, request.getStableHow(), request.getData(), channel, xid, false, dataState);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Add new write to the list with nextOffset " + cachedOffset + " and requesed offset=" + offset));
        }
        if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
            this.updateNonSequentialWriteInMemory(count);
        }
        if ((oldWriteCtx = this.checkRepeatedWriteRequest(request, channel, xid)) == null) {
            this.addWrite(writeCtx);
        } else {
            LOG.warn((Object)("Got a repeated request, same range, with xid:" + writeCtx.getXid()));
        }
        return writeCtx;
    }

    private void processOverWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, IdUserGroup iug) {
        WRITE3Response response;
        WccData wccData = new WccData(this.latestAttr.getWccAttr(), null);
        long offset = request.getOffset();
        int count = request.getCount();
        Nfs3Constant.WriteStableHow stableHow = request.getStableHow();
        long cachedOffset = this.nextOffset.get();
        if (offset + (long)count > cachedOffset) {
            LOG.warn((Object)"Treat this jumbo write as a real random write, no support.");
            response = new WRITE3Response(22, wccData, 0, Nfs3Constant.WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Process perfectOverWrite");
            }
            response = this.processPerfectOverWrite(dfsClient, offset, count, stableHow, request.getData().array(), Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
        }
        this.updateLastAccessTime();
        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
    }

    private synchronized boolean checkAndStartWrite(AsyncDataService asyncDataService, WriteCtx writeCtx) {
        if (writeCtx.getOffset() == this.nextOffset.get()) {
            if (!this.asyncStatus) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Trigger the write back task. Current nextOffset: " + this.nextOffset.get()));
                }
                this.asyncStatus = true;
                this.asyncWriteBackStartOffset = writeCtx.getOffset();
                asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
            } else if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"The write back thread is working.");
            }
            return true;
        }
        return false;
    }

    private void receivedNewWriteInternal(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) {
        Nfs3Constant.WriteStableHow stableHow = request.getStableHow();
        WccAttr preOpAttr = this.latestAttr.getWccAttr();
        int count = request.getCount();
        WriteCtx writeCtx = this.addWritesToCache(request, channel, xid);
        if (writeCtx == null) {
            this.processOverWrite(dfsClient, request, channel, xid, iug);
        } else {
            boolean startWriting = this.checkAndStartWrite(asyncDataService, writeCtx);
            if (!startWriting) {
                this.checkDump();
                if (stableHow != Nfs3Constant.WriteStableHow.UNSTABLE) {
                    LOG.info((Object)("Have to change stable write to unstable write:" + request.getStableHow()));
                    stableHow = Nfs3Constant.WriteStableHow.UNSTABLE;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("UNSTABLE write request, send response for offset: " + writeCtx.getOffset()));
                }
                WccData fileWcc = new WccData(preOpAttr, this.latestAttr);
                WRITE3Response response = new WRITE3Response(0, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
                writeCtx.setReplied(true);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, long offset, int count, Nfs3Constant.WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdUserGroup iug) {
        FSDataInputStream fis;
        int readCount;
        byte[] readbuffer;
        WRITE3Response response;
        block11: {
            response = null;
            readbuffer = new byte[count];
            readCount = 0;
            fis = null;
            try {
                this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
            }
            catch (ClosedChannelException closedException) {
                LOG.info((Object)"The FSDataOutputStream has been closed. Continue processing the perfect overwrite.");
            }
            catch (IOException e) {
                LOG.info((Object)("hsync failed when processing possible perfect overwrite, path=" + path + " error:" + e));
                return new WRITE3Response(5, wccData, 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
            }
            try {
                fis = new FSDataInputStream((InputStream)dfsClient.open(path));
                readCount = fis.read(offset, readbuffer, 0, count);
                if (readCount >= count) break block11;
                LOG.error((Object)("Can't read back " + count + " bytes, partial read size:" + readCount));
                WRITE3Response e = new WRITE3Response(5, wccData, 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
            }
            catch (IOException e) {
                WRITE3Response wRITE3Response;
                try {
                    LOG.info((Object)("Read failed when processing possible perfect overwrite, path=" + path), (Throwable)e);
                    wRITE3Response = new WRITE3Response(5, wccData, 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                }
                catch (Throwable throwable) {
                    IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{fis});
                    throw throwable;
                }
                IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{fis});
                return wRITE3Response;
            }
            IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{fis});
            return e;
        }
        IOUtils.cleanup((Log)LOG, (Closeable[])new Closeable[]{fis});
        BytesWritable.Comparator comparator = new BytesWritable.Comparator();
        if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
            LOG.info((Object)"Perfect overwrite has different content");
            response = new WRITE3Response(22, wccData, 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
        } else {
            LOG.info((Object)"Perfect overwrite has same content, updating the mtime, then return success");
            Nfs3FileAttributes postOpAttr = null;
            try {
                dfsClient.setTimes(path, System.currentTimeMillis(), -1L);
                postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
            }
            catch (IOException e) {
                LOG.info((Object)("Got error when processing perfect overwrite, path=" + path + " error:" + e));
                return new WRITE3Response(5, wccData, 0, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
            }
            wccData.setPostOpAttr(postOpAttr);
            response = new WRITE3Response(0, wccData, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
        }
        return response;
    }

    public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
        if (!fromRead) {
            Preconditions.checkState((channel != null && preOpAttr != null ? 1 : 0) != 0);
            this.updateLastAccessTime();
        }
        Preconditions.checkState((commitOffset >= 0L ? 1 : 0) != 0);
        COMMIT_STATUS ret = this.checkCommitInternal(commitOffset, channel, xid, preOpAttr, fromRead);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Got commit status: " + ret.name()));
        }
        if (ret == COMMIT_STATUS.COMMIT_DO_SYNC || ret == COMMIT_STATUS.COMMIT_FINISHED) {
            try {
                this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                ret = COMMIT_STATUS.COMMIT_FINISHED;
            }
            catch (ClosedChannelException cce) {
                ret = this.pendingWrites.isEmpty() ? COMMIT_STATUS.COMMIT_FINISHED : COMMIT_STATUS.COMMIT_ERROR;
            }
            catch (IOException e) {
                LOG.error((Object)("Got stream error during data sync:" + e));
                ret = COMMIT_STATUS.COMMIT_ERROR;
            }
        }
        return ret;
    }

    @VisibleForTesting
    synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
        if (!this.activeState) {
            if (this.pendingWrites.isEmpty()) {
                return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
            }
            return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
        }
        long flushed = 0L;
        try {
            flushed = this.getFlushedOffset();
        }
        catch (IOException e) {
            LOG.error((Object)("Can't get flushed offset, error:" + e));
            return COMMIT_STATUS.COMMIT_ERROR;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset));
        }
        if (commitOffset > 0L) {
            if (commitOffset > flushed) {
                if (!fromRead) {
                    CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
                    this.pendingCommits.put(commitOffset, commitCtx);
                }
                return COMMIT_STATUS.COMMIT_WAIT;
            }
            return COMMIT_STATUS.COMMIT_DO_SYNC;
        }
        Map.Entry key = this.pendingWrites.firstEntry();
        if (this.pendingWrites.isEmpty()) {
            return COMMIT_STATUS.COMMIT_FINISHED;
        }
        if (!fromRead) {
            long maxOffset = ((OffsetRange)key.getKey()).getMax() - 1L;
            Preconditions.checkState((maxOffset > 0L ? 1 : 0) != 0);
            CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
            this.pendingCommits.put(maxOffset, commitCtx);
        }
        return COMMIT_STATUS.COMMIT_WAIT;
    }

    private void addWrite(WriteCtx writeCtx) {
        long offset = writeCtx.getOffset();
        int count = writeCtx.getCount();
        this.pendingWrites.put(new OffsetRange(offset, offset + (long)count), writeCtx);
    }

    public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
        Preconditions.checkState((streamTimeout >= 10000L ? 1 : 0) != 0);
        if (!this.activeState) {
            return true;
        }
        boolean flag = false;
        if (this.checkStreamTimeout(streamTimeout)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("stream can be closed for fileId:" + fileId));
            }
            flag = true;
        }
        return flag;
    }

    private synchronized WriteCtx offerNextToWrite() {
        if (this.pendingWrites.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("The asyn write task has no pending writes, fileId: " + this.latestAttr.getFileId()));
            }
            this.processCommits(this.nextOffset.get());
            this.asyncStatus = false;
            return null;
        }
        Map.Entry lastEntry = this.pendingWrites.lastEntry();
        OffsetRange range = (OffsetRange)lastEntry.getKey();
        WriteCtx toWrite = (WriteCtx)lastEntry.getValue();
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("range.getMin()=" + range.getMin() + " nextOffset=" + this.nextOffset));
        }
        long offset = this.nextOffset.get();
        if (range.getMin() > offset) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"The next sequencial write has not arrived yet");
            }
            this.processCommits(this.nextOffset.get());
            this.asyncStatus = false;
        } else if (range.getMin() < offset && range.getMax() > offset) {
            LOG.warn((Object)("Got a overlapping write (" + range.getMin() + "," + range.getMax() + "), nextOffset=" + offset + ". Silently drop it now"));
            this.pendingWrites.remove(range);
            this.processCommits(this.nextOffset.get());
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Remove write(" + range.getMin() + "-" + range.getMax() + ") from the list"));
            }
            this.pendingWrites.remove(range);
            this.nextOffset.addAndGet(toWrite.getCount());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Change nextOffset to " + this.nextOffset.get()));
            }
            return toWrite;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void executeWriteBack() {
        Preconditions.checkState((boolean)this.asyncStatus, (Object)("openFileCtx has false asyncStatus, fileId:" + this.latestAttr.getFileId()));
        long startOffset = this.asyncWriteBackStartOffset;
        try {
            WriteCtx toWrite;
            while (this.activeState && (toWrite = this.offerNextToWrite()) != null) {
                this.doSingleWrite(toWrite);
                this.updateLastAccessTime();
            }
            if (!this.activeState && LOG.isDebugEnabled()) {
                LOG.debug((Object)("The openFileCtx is not active anymore, fileId: " + this.latestAttr.getFileId()));
            }
        }
        finally {
            OpenFileCtx openFileCtx = this;
            synchronized (openFileCtx) {
                if (startOffset == this.asyncWriteBackStartOffset) {
                    this.asyncStatus = false;
                } else {
                    LOG.info((Object)("Another asyn task is already started before this one is finalized. fileId:" + this.latestAttr.getFileId() + " asyncStatus:" + this.asyncStatus + " original startOffset:" + startOffset + " new startOffset:" + this.asyncWriteBackStartOffset + ". Won't change asyncStatus here."));
                }
            }
        }
    }

    private void processCommits(long offset) {
        Preconditions.checkState((offset > 0L ? 1 : 0) != 0);
        long flushedOffset = 0L;
        Map.Entry entry = null;
        int status = 5;
        try {
            flushedOffset = this.getFlushedOffset();
            entry = this.pendingCommits.firstEntry();
            if (entry == null || ((CommitCtx)entry.getValue()).offset > flushedOffset) {
                return;
            }
            this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
            status = 0;
        }
        catch (ClosedChannelException cce) {
            if (!this.pendingWrites.isEmpty()) {
                LOG.error((Object)("Can't sync for fileId: " + this.latestAttr.getFileId() + ". Channel closed with writes pending."), (Throwable)cce);
            }
            status = 5;
        }
        catch (IOException e) {
            LOG.error((Object)"Got stream error during data sync:", (Throwable)e);
            status = 5;
        }
        try {
            this.latestAttr = Nfs3Utils.getFileAttr(this.client, Nfs3Utils.getFileIdPath(this.latestAttr.getFileId()), this.iug);
        }
        catch (IOException e) {
            LOG.error((Object)("Can't get new file attr, fileId: " + this.latestAttr.getFileId()), (Throwable)e);
            status = 5;
        }
        if (this.latestAttr.getSize() != offset) {
            LOG.error((Object)("After sync, the expect file size: " + offset + ", however actual file size is: " + this.latestAttr.getSize()));
            status = 5;
        }
        WccData wccData = new WccData(Nfs3Utils.getWccAttr(this.latestAttr), this.latestAttr);
        while (entry != null && ((CommitCtx)entry.getValue()).offset <= flushedOffset) {
            this.pendingCommits.remove(entry.getKey());
            CommitCtx commit = (CommitCtx)entry.getValue();
            COMMIT3Response response = new COMMIT3Response(status, wccData, Nfs3Constant.WRITE_COMMIT_VERF);
            Nfs3Utils.writeChannelCommit(commit.getChannel(), response.writeHeaderAndResponse(new XDR(), commit.getXid(), (Verifier)new VerifierNone()), commit.getXid());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("FileId: " + this.latestAttr.getFileId() + " Service time:" + (System.currentTimeMillis() - commit.getStartTime()) + "ms. Sent response for commit:" + commit));
            }
            entry = this.pendingCommits.firstEntry();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSingleWrite(WriteCtx writeCtx) {
        Channel channel = writeCtx.getChannel();
        int xid = writeCtx.getXid();
        long offset = writeCtx.getOffset();
        int count = writeCtx.getCount();
        Nfs3Constant.WriteStableHow stableHow = writeCtx.getStableHow();
        FileHandle handle = writeCtx.getHandle();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("do write, fileId: " + handle.getFileId() + " offset: " + offset + " length:" + count + " stableHow:" + stableHow.name()));
        }
        try {
            writeCtx.writeData(this.fos);
            long flushedOffset = this.getFlushedOffset();
            if (flushedOffset != offset + (long)count) {
                throw new IOException("output stream is out of sync, pos=" + flushedOffset + " and nextOffset should be" + (offset + (long)count));
            }
            if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
                WriteCtx writeCtx2 = writeCtx;
                synchronized (writeCtx2) {
                    if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
                        writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
                        this.updateNonSequentialWriteInMemory(-count);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("After writing " + handle.getFileId() + " at offset " + offset + ", updated the memory count, new value:" + this.nonSequentialWriteInMemory.get()));
                        }
                    }
                }
            }
            if (!writeCtx.getReplied()) {
                if (stableHow != Nfs3Constant.WriteStableHow.UNSTABLE) {
                    LOG.info((Object)("Do sync for stable write:" + writeCtx));
                    try {
                        if (stableHow == Nfs3Constant.WriteStableHow.DATA_SYNC) {
                            this.fos.hsync();
                        } else {
                            Preconditions.checkState((stableHow == Nfs3Constant.WriteStableHow.FILE_SYNC ? 1 : 0) != 0, (Object)("Unknown WriteStableHow:" + stableHow));
                            this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                        }
                    }
                    catch (IOException e) {
                        LOG.error((Object)("hsync failed with writeCtx:" + writeCtx), (Throwable)e);
                        throw e;
                    }
                }
                WccAttr preOpAttr = this.latestAttr.getWccAttr();
                WccData fileWcc = new WccData(preOpAttr, this.latestAttr);
                if (writeCtx.getOriginalCount() != -1) {
                    LOG.warn((Object)("Return original count:" + writeCtx.getOriginalCount() + " instead of real data count:" + count));
                    count = writeCtx.getOriginalCount();
                }
                WRITE3Response response = new WRITE3Response(0, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
            }
            this.processCommits(writeCtx.getOffset() + (long)writeCtx.getCount());
        }
        catch (IOException e) {
            LOG.error((Object)("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + count), (Throwable)e);
            if (!writeCtx.getReplied()) {
                WRITE3Response response = new WRITE3Response(5);
                Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(new XDR(), xid, (Verifier)new VerifierNone()), xid);
            }
            LOG.info((Object)("Clean up open file context for fileId: " + this.latestAttr.getFileId()));
            this.cleanup();
        }
    }

    synchronized void cleanup() {
        if (!this.activeState) {
            LOG.info((Object)"Current OpenFileCtx is already inactive, no need to cleanup.");
            return;
        }
        this.activeState = false;
        if (this.dumpThread != null && this.dumpThread.isAlive()) {
            this.dumpThread.interrupt();
            try {
                this.dumpThread.join(3000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
        try {
            if (this.fos != null) {
                this.fos.close();
            }
        }
        catch (IOException e) {
            LOG.info((Object)("Can't close stream for fileId:" + this.latestAttr.getFileId() + ", error:" + e));
        }
        LOG.info((Object)("There are " + this.pendingWrites.size() + " pending writes."));
        WccAttr preOpAttr = this.latestAttr.getWccAttr();
        while (!this.pendingWrites.isEmpty()) {
            OffsetRange key = (OffsetRange)this.pendingWrites.firstKey();
            LOG.info((Object)("Fail pending write: (" + key.getMin() + "," + key.getMax() + "), nextOffset=" + this.nextOffset.get()));
            WriteCtx writeCtx = (WriteCtx)this.pendingWrites.remove(key);
            if (writeCtx.getReplied()) continue;
            WccData fileWcc = new WccData(preOpAttr, this.latestAttr);
            WRITE3Response response = new WRITE3Response(5, fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
            Nfs3Utils.writeChannel(writeCtx.getChannel(), response.writeHeaderAndResponse(new XDR(), writeCtx.getXid(), (Verifier)new VerifierNone()), writeCtx.getXid());
        }
        if (this.dumpOut != null) {
            try {
                this.dumpOut.close();
            }
            catch (IOException e) {
                LOG.error((Object)("Failed to close outputstream of dump file" + this.dumpFilePath), (Throwable)e);
            }
            File dumpFile = new File(this.dumpFilePath);
            if (dumpFile.exists() && !dumpFile.delete()) {
                LOG.error((Object)("Failed to delete dumpfile: " + dumpFile));
            }
        }
        if (this.raf != null) {
            try {
                this.raf.close();
            }
            catch (IOException e) {
                LOG.error((Object)"Got exception when closing input stream of dump file.", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest() {
        return this.pendingWrites;
    }

    @VisibleForTesting
    ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest() {
        return this.pendingCommits;
    }

    @VisibleForTesting
    long getNextOffsetForTest() {
        return this.nextOffset.get();
    }

    @VisibleForTesting
    void setNextOffsetForTest(long newValue) {
        this.nextOffset.set(newValue);
    }

    @VisibleForTesting
    void setActiveStatusForTest(boolean activeState) {
        this.activeState = activeState;
    }

    public String toString() {
        return String.format("activeState: %b asyncStatus: %b nextOffset: %d", this.activeState, this.asyncStatus, this.nextOffset.get());
    }

    class Dumper
    implements Runnable {
        Dumper() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void dump() {
            if (OpenFileCtx.this.dumpOut == null) {
                LOG.info((Object)("Create dump file:" + OpenFileCtx.this.dumpFilePath));
                File dumpFile = new File(OpenFileCtx.this.dumpFilePath);
                try {
                    Dumper dumper = this;
                    synchronized (dumper) {
                        Preconditions.checkState((boolean)dumpFile.createNewFile(), (String)"The dump file should not exist: %s", (Object[])new Object[]{OpenFileCtx.this.dumpFilePath});
                        OpenFileCtx.this.dumpOut = new FileOutputStream(dumpFile);
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)("Got failure when creating dump stream " + OpenFileCtx.this.dumpFilePath), (Throwable)e);
                    OpenFileCtx.this.enabledDump = false;
                    if (OpenFileCtx.this.dumpOut != null) {
                        try {
                            OpenFileCtx.this.dumpOut.close();
                        }
                        catch (IOException e1) {
                            LOG.error((Object)("Can't close dump stream " + OpenFileCtx.this.dumpFilePath), (Throwable)e);
                        }
                    }
                    return;
                }
            }
            if (OpenFileCtx.this.raf == null) {
                try {
                    OpenFileCtx.this.raf = new RandomAccessFile(OpenFileCtx.this.dumpFilePath, "r");
                }
                catch (FileNotFoundException e) {
                    LOG.error((Object)("Can't get random access to file " + OpenFileCtx.this.dumpFilePath));
                    OpenFileCtx.this.enabledDump = false;
                    return;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Start dump. Before dump, nonSequentialWriteInMemory == " + OpenFileCtx.this.nonSequentialWriteInMemory.get()));
            }
            Iterator it = OpenFileCtx.this.pendingWrites.keySet().iterator();
            while (OpenFileCtx.this.activeState && it.hasNext() && OpenFileCtx.this.nonSequentialWriteInMemory.get() > 0L) {
                OffsetRange key = (OffsetRange)it.next();
                WriteCtx writeCtx = (WriteCtx)OpenFileCtx.this.pendingWrites.get(key);
                if (writeCtx == null) continue;
                try {
                    long dumpedDataSize = writeCtx.dumpData(OpenFileCtx.this.dumpOut, OpenFileCtx.this.raf);
                    if (dumpedDataSize <= 0L) continue;
                    OpenFileCtx.this.updateNonSequentialWriteInMemory(-dumpedDataSize);
                }
                catch (IOException e) {
                    LOG.error((Object)("Dump data failed:" + writeCtx + " with error:" + e + " OpenFileCtx state:" + OpenFileCtx.this.activeState));
                    OpenFileCtx.this.enabledDump = false;
                    return;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("After dump, nonSequentialWriteInMemory == " + OpenFileCtx.this.nonSequentialWriteInMemory.get()));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (OpenFileCtx.this.activeState && OpenFileCtx.this.enabledDump) {
                try {
                    if (OpenFileCtx.this.nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
                        this.dump();
                    }
                    OpenFileCtx openFileCtx = OpenFileCtx.this;
                    synchronized (openFileCtx) {
                        if (OpenFileCtx.this.nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
                            try {
                                OpenFileCtx.this.wait();
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug((Object)"Dumper woke up");
                                }
                            }
                            catch (InterruptedException e) {
                                LOG.info((Object)("Dumper is interrupted, dumpFilePath= " + OpenFileCtx.this.dumpFilePath));
                            }
                        }
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug((Object)("Dumper checking OpenFileCtx activeState: " + OpenFileCtx.this.activeState + " enabledDump: " + OpenFileCtx.this.enabledDump));
                }
                catch (Throwable t) {
                    LOG.info((Object)("Dumper get Throwable: " + t + ". dumpFilePath: " + OpenFileCtx.this.dumpFilePath), t);
                }
            }
        }
    }

    static class CommitCtx {
        private final long offset;
        private final Channel channel;
        private final int xid;
        private final Nfs3FileAttributes preOpAttr;
        private final long startTime;

        long getOffset() {
            return this.offset;
        }

        Channel getChannel() {
            return this.channel;
        }

        int getXid() {
            return this.xid;
        }

        Nfs3FileAttributes getPreOpAttr() {
            return this.preOpAttr;
        }

        long getStartTime() {
            return this.startTime;
        }

        CommitCtx(long offset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
            this.offset = offset;
            this.channel = channel;
            this.xid = xid;
            this.preOpAttr = preOpAttr;
            this.startTime = System.currentTimeMillis();
        }

        public String toString() {
            return String.format("offset: %d xid: %d startTime: %d", this.offset, this.xid, this.startTime);
        }
    }

    static enum COMMIT_STATUS {
        COMMIT_FINISHED,
        COMMIT_WAIT,
        COMMIT_INACTIVE_CTX,
        COMMIT_INACTIVE_WITH_PENDING_WRITE,
        COMMIT_ERROR,
        COMMIT_DO_SYNC;

    }
}

