/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.swift.snative;

import java.io.EOFException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.swift.exceptions.SwiftConnectionClosedException;
import org.apache.hadoop.fs.swift.exceptions.SwiftException;
import org.apache.hadoop.fs.swift.http.HttpBodyContent;
import org.apache.hadoop.fs.swift.http.HttpInputStreamWithRelease;
import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore;
import org.apache.hadoop.fs.swift.util.SwiftUtils;

class SwiftNativeInputStream
extends FSInputStream {
    private static final Log LOG = LogFactory.getLog(SwiftNativeInputStream.class);
    private final long bufferSize;
    private final SwiftNativeFileSystemStore nativeStore;
    private final FileSystem.Statistics statistics;
    private HttpInputStreamWithRelease httpStream;
    private final Path path;
    private long pos = 0L;
    private long contentLength = -1L;
    private String reasonClosed = "unopened";
    private long rangeOffset = 0L;

    public SwiftNativeInputStream(SwiftNativeFileSystemStore storeNative, FileSystem.Statistics statistics, Path path, long bufferSize) throws IOException {
        this.nativeStore = storeNative;
        this.statistics = statistics;
        this.path = path;
        if (bufferSize <= 0L) {
            throw new IllegalArgumentException("Invalid buffer size");
        }
        this.bufferSize = bufferSize;
        this.httpStream = storeNative.getObject(path).getInputStream();
    }

    private synchronized void incPos(int offset) {
        this.pos += (long)offset;
        this.rangeOffset += (long)offset;
        SwiftUtils.trace((Log)LOG, (String)"Inc: pos=%d bufferOffset=%d", (Object[])new Object[]{this.pos, this.rangeOffset});
    }

    private synchronized void updateStartOfBufferPosition(long seekPos, long contentLength) {
        this.pos = seekPos;
        this.rangeOffset = 0L;
        this.contentLength = contentLength;
        SwiftUtils.trace((Log)LOG, (String)"Move: pos=%d; bufferOffset=%d; contentLength=%d", (Object[])new Object[]{this.pos, this.rangeOffset, contentLength});
    }

    public synchronized int read() throws IOException {
        int result;
        block4: {
            this.verifyOpen();
            result = -1;
            try {
                result = this.httpStream.read();
            }
            catch (IOException e) {
                String msg = "IOException while reading " + this.path + ": " + e + ", attempting to reopen.";
                LOG.debug((Object)msg, (Throwable)e);
                if (!this.reopenBuffer()) break block4;
                result = this.httpStream.read();
            }
        }
        if (result != -1) {
            this.incPos(1);
        }
        if (this.statistics != null && result != -1) {
            this.statistics.incrementBytesRead(1L);
        }
        return result;
    }

    public synchronized int read(byte[] b, int off, int len) throws IOException {
        int result;
        block4: {
            SwiftUtils.debug((Log)LOG, (String)"read(buffer, %d, %d)", (Object[])new Object[]{off, len});
            SwiftUtils.validateReadArgs((byte[])b, (int)off, (int)len);
            result = -1;
            try {
                this.verifyOpen();
                result = this.httpStream.read(b, off, len);
            }
            catch (IOException e) {
                LOG.info((Object)("Received IOException while reading '" + this.path + "', attempting to reopen: " + e));
                LOG.debug((Object)("IOE on read()" + e), (Throwable)e);
                if (!this.reopenBuffer()) break block4;
                result = this.httpStream.read(b, off, len);
            }
        }
        if (result > 0) {
            this.incPos(result);
            if (this.statistics != null) {
                this.statistics.incrementBytesRead((long)result);
            }
        }
        return result;
    }

    private boolean reopenBuffer() throws IOException {
        this.innerClose("reopening buffer to trigger refresh");
        boolean success = false;
        try {
            this.fillBuffer(this.pos);
            success = true;
        }
        catch (EOFException eof) {
            this.reasonClosed = "End of file";
        }
        return success;
    }

    public synchronized void close() throws IOException {
        this.innerClose("closed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void innerClose(String reason) throws IOException {
        try {
            if (this.httpStream != null) {
                this.reasonClosed = reason;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Closing HTTP input stream : " + reason));
                }
                this.httpStream.close();
            }
        }
        finally {
            this.httpStream = null;
        }
    }

    private void verifyOpen() throws SwiftConnectionClosedException {
        if (this.httpStream == null) {
            throw new SwiftConnectionClosedException(this.reasonClosed);
        }
    }

    public synchronized String toString() {
        return "SwiftNativeInputStream position=" + this.pos + " buffer size = " + this.bufferSize + " " + (this.httpStream != null ? this.httpStream.toString() : " no input stream: " + this.reasonClosed);
    }

    protected void finalize() throws Throwable {
        if (this.httpStream != null) {
            LOG.error((Object)("Input stream is leaking handles by not being closed() properly: " + this.httpStream.toString()));
        }
    }

    private int chompBytes(long bytes) throws IOException {
        int count = 0;
        if (this.httpStream != null) {
            for (long i = 0L; i < bytes; ++i) {
                int result = this.httpStream.read();
                if (result < 0) {
                    throw new SwiftException("Received error code while chomping input");
                }
                ++count;
                this.incPos(1);
            }
        }
        return count;
    }

    public synchronized void seek(long targetPos) throws IOException {
        if (targetPos < 0L) {
            throw new EOFException("Cannot seek to a negative offset");
        }
        long offset = targetPos - this.pos;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Seek to " + targetPos + "; current pos =" + this.pos + "; offset=" + offset));
        }
        if (offset == 0L) {
            LOG.debug((Object)"seek is no-op");
            return;
        }
        if (offset < 0L) {
            LOG.debug((Object)"seek is backwards");
        } else if (this.rangeOffset + offset < this.bufferSize) {
            SwiftUtils.debug((Log)LOG, (String)"seek is within current stream; pos= %d ; targetPos=%d; offset= %d ; bufferOffset=%d", (Object[])new Object[]{this.pos, targetPos, offset, this.rangeOffset});
            try {
                LOG.debug((Object)"chomping ");
                this.chompBytes(offset);
            }
            catch (IOException e) {
                LOG.debug((Object)"while chomping ", (Throwable)e);
            }
            if (targetPos - this.pos == 0L) {
                LOG.trace((Object)"chomping successful");
                return;
            }
            LOG.trace((Object)"chomping failed");
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Seek is beyond buffer size of " + this.bufferSize));
        }
        this.innerClose("seeking to " + targetPos);
        this.fillBuffer(targetPos);
    }

    private void fillBuffer(long targetPos) throws IOException {
        long length = targetPos + this.bufferSize;
        SwiftUtils.debug((Log)LOG, (String)"Fetching %d bytes starting at %d", (Object[])new Object[]{length, targetPos});
        HttpBodyContent blob = this.nativeStore.getObject(this.path, targetPos, length);
        this.httpStream = blob.getInputStream();
        this.updateStartOfBufferPosition(targetPos, blob.getContentLength());
    }

    public synchronized long getPos() throws IOException {
        return this.pos;
    }

    public boolean seekToNewSource(long targetPos) throws IOException {
        return false;
    }
}

