/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

class BlockReaderLocal
implements BlockReader {
    private static final Log LOG = LogFactory.getLog(DFSClient.class);
    private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
    private final FileInputStream dataIn;
    private FileInputStream checksumIn;
    private int offsetFromChunkBoundary;
    private byte[] skipBuf = null;
    private ByteBuffer dataBuff = null;
    private ByteBuffer checksumBuff = null;
    private DataChecksum checksum;
    private final boolean verifyChecksum;
    private static DirectBufferPool bufferPool = new DirectBufferPool();
    private int bytesPerChecksum;
    private int checksumSize;
    private long startOffset;
    private final String filename;

    static BlockReaderLocal newBlockReader(Configuration conf, String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout, long startOffset, long length) throws IOException {
        LocalDatanodeInfo localDatanodeInfo = BlockReaderLocal.getLocalDatanodeInfo(node.getIpcPort());
        BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
        if (pathinfo == null) {
            pathinfo = BlockReaderLocal.getBlockPathInfo(blk, node, conf, socketTimeout, token);
        }
        FileInputStream dataIn = null;
        FileInputStream checksumIn = null;
        BlockReaderLocal localBlockReader = null;
        boolean skipChecksumCheck = BlockReaderLocal.skipChecksumCheck(conf);
        try {
            File blkfile = new File(pathinfo.getBlockPath());
            dataIn = new FileInputStream(blkfile);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("New BlockReaderLocal for file " + blkfile + " of size " + blkfile.length() + " startOffset " + startOffset + " length " + length + " short circuit checksum " + skipChecksumCheck));
            }
            if (!skipChecksumCheck) {
                File metafile = new File(pathinfo.getMetaPath());
                checksumIn = new FileInputStream(metafile);
                BlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));
                short version = header.getVersion();
                if (version != 1) {
                    LOG.warn((Object)("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ..."));
                }
                DataChecksum checksum = header.getChecksum();
                long firstChunkOffset = startOffset - startOffset % (long)checksum.getBytesPerChecksum();
                localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, firstChunkOffset, checksumIn);
            } else {
                localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
            }
        }
        catch (IOException e) {
            localDatanodeInfo.removeBlockLocalPathInfo(blk);
            DFSClient.LOG.warn((Object)("BlockReaderLocal: Removing " + blk + " from cache because local file " + pathinfo.getBlockPath() + " could not be opened."));
            throw e;
        }
        finally {
            if (localBlockReader == null) {
                if (dataIn != null) {
                    dataIn.close();
                }
                if (checksumIn != null) {
                    checksumIn.close();
                }
            }
        }
        return localBlockReader;
    }

    private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
        LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
        if (ldInfo == null) {
            ldInfo = new LocalDatanodeInfo();
            localDatanodeInfoMap.put(port, ldInfo);
        }
        return ldInfo;
    }

    private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, Token<BlockTokenIdentifier> token) throws IOException {
        LocalDatanodeInfo localDatanodeInfo = BlockReaderLocal.getLocalDatanodeInfo(node.ipcPort);
        BlockLocalPathInfo pathinfo = null;
        ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node, conf, timeout);
        try {
            pathinfo = proxy.getBlockLocalPathInfo(blk, token);
            if (pathinfo != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Cached location of block " + blk + " as " + pathinfo));
                }
                localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
            }
        }
        catch (IOException e) {
            localDatanodeInfo.resetDatanodeProxy();
            throw e;
        }
        return pathinfo;
    }

    private static boolean skipChecksumCheck(Configuration conf) {
        return conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
    }

    private BlockReaderLocal(Configuration conf, String hdfsfile, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
        this(conf, hdfsfile, block, token, startOffset, length, pathinfo, DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.NULL, (int)4), false, dataIn, startOffset, null);
    }

    private BlockReaderLocal(Configuration conf, String hdfsfile, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, FileInputStream checksumIn) throws IOException {
        long skipped;
        this.filename = hdfsfile;
        this.checksum = checksum;
        this.verifyChecksum = verifyChecksum;
        this.startOffset = Math.max(startOffset, 0L);
        this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
        this.checksumSize = this.checksum.getChecksumSize();
        this.dataIn = dataIn;
        this.checksumIn = checksumIn;
        this.offsetFromChunkBoundary = (int)(startOffset - firstChunkOffset);
        this.dataBuff = bufferPool.getBuffer(this.bytesPerChecksum * 64);
        this.checksumBuff = bufferPool.getBuffer(this.checksumSize * 64);
        this.dataBuff.flip();
        this.checksumBuff.flip();
        for (long toSkip = firstChunkOffset; toSkip > 0L; toSkip -= skipped) {
            skipped = dataIn.skip(toSkip);
            if (skipped != 0L) continue;
            throw new IOException("Couldn't initialize input stream");
        }
        if (checksumIn != null) {
            long skipped2;
            for (long checkSumOffset = firstChunkOffset / (long)this.bytesPerChecksum * (long)this.checksumSize; checkSumOffset > 0L; checkSumOffset -= skipped2) {
                skipped2 = checksumIn.skip(checkSumOffset);
                if (skipped2 != 0L) continue;
                throw new IOException("Couldn't initialize checksum input stream");
            }
        }
    }

    private int readIntoBuffer(FileInputStream stream, ByteBuffer buf) throws IOException {
        int bytesRead = stream.getChannel().read(buf);
        if (bytesRead < 0) {
            return bytesRead;
        }
        while (buf.remaining() > 0) {
            int n = stream.getChannel().read(buf);
            if (n < 0) {
                return bytesRead;
            }
            bytesRead += n;
        }
        return bytesRead;
    }

    @Override
    public synchronized int read(byte[] buf, int off, int len) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.info((Object)("read off " + off + " len " + len));
        }
        if (!this.verifyChecksum) {
            return this.dataIn.read(buf, off, len);
        }
        int dataRead = -1;
        if (this.dataBuff.remaining() == 0) {
            this.dataBuff.clear();
            this.checksumBuff.clear();
            dataRead = this.readIntoBuffer(this.dataIn, this.dataBuff);
            this.readIntoBuffer(this.checksumIn, this.checksumBuff);
            this.checksumBuff.flip();
            this.dataBuff.flip();
            this.checksum.verifyChunkedSums(this.dataBuff, this.checksumBuff, this.filename, this.startOffset);
        } else {
            dataRead = this.dataBuff.remaining();
        }
        if (dataRead > 0) {
            int nRead = Math.min(dataRead - this.offsetFromChunkBoundary, len);
            if (this.offsetFromChunkBoundary > 0) {
                this.dataBuff.position(this.offsetFromChunkBoundary);
                this.offsetFromChunkBoundary = 0;
            }
            if (nRead > 0) {
                this.dataBuff.get(buf, off, nRead);
                return nRead;
            }
            return 0;
        }
        return -1;
    }

    @Override
    public synchronized long skip(long n) throws IOException {
        long skipped;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("skip " + n));
        }
        if (n <= 0L) {
            return 0L;
        }
        if (!this.verifyChecksum) {
            return this.dataIn.skip(n);
        }
        int remaining = this.dataBuff.remaining();
        int position = this.dataBuff.position();
        int newPosition = position + (int)n;
        if (n <= (long)remaining) {
            assert (this.offsetFromChunkBoundary == 0);
            this.dataBuff.position(newPosition);
            return n;
        }
        if (n - (long)remaining <= (long)this.bytesPerChecksum) {
            this.dataBuff.position(position + remaining);
            if (this.skipBuf == null) {
                this.skipBuf = new byte[this.bytesPerChecksum];
            }
            int ret = this.read(this.skipBuf, 0, (int)(n - (long)remaining));
            return ret;
        }
        this.offsetFromChunkBoundary = newPosition % this.bytesPerChecksum;
        long toskip = n - (long)remaining - (long)this.offsetFromChunkBoundary;
        this.dataBuff.clear();
        this.checksumBuff.clear();
        long dataSkipped = this.dataIn.skip(toskip);
        if (dataSkipped != toskip) {
            throw new IOException("skip error in data input stream");
        }
        long checkSumOffset = dataSkipped / (long)this.bytesPerChecksum * (long)this.checksumSize;
        if (checkSumOffset > 0L && (skipped = this.checksumIn.skip(checkSumOffset)) != checkSumOffset) {
            throw new IOException("skip error in checksum input stream");
        }
        if (this.skipBuf == null) {
            this.skipBuf = new byte[this.bytesPerChecksum];
        }
        assert (this.skipBuf.length == this.bytesPerChecksum);
        assert (this.offsetFromChunkBoundary < this.bytesPerChecksum);
        int ret = this.read(this.skipBuf, 0, this.offsetFromChunkBoundary);
        if (ret == -1) {
            return toskip;
        }
        return toskip + (long)ret;
    }

    @Override
    public synchronized void close() throws IOException {
        this.dataIn.close();
        if (this.checksumIn != null) {
            this.checksumIn.close();
        }
        if (this.dataBuff != null) {
            bufferPool.returnBuffer(this.dataBuff);
            this.dataBuff = null;
        }
        if (this.checksumBuff != null) {
            bufferPool.returnBuffer(this.checksumBuff);
            this.checksumBuff = null;
        }
        this.startOffset = -1L;
        this.checksum = null;
    }

    @Override
    public int readAll(byte[] buf, int offset, int len) throws IOException {
        return BlockReaderUtil.readAll(this, buf, offset, len);
    }

    @Override
    public void readFully(byte[] buf, int off, int len) throws IOException {
        BlockReaderUtil.readFully(this, buf, off, len);
    }

    @Override
    public Socket takeSocket() {
        return null;
    }

    @Override
    public boolean hasSentStatusCode() {
        return false;
    }

    private static class LocalDatanodeInfo {
        private ClientDatanodeProtocol proxy = null;
        private final Map<ExtendedBlock, BlockLocalPathInfo> cache;

        LocalDatanodeInfo() {
            int cacheSize = 10000;
            float hashTableLoadFactor = 0.75f;
            int hashTableCapacity = (int)Math.ceil(13333.3330078125) + 1;
            this.cache = Collections.synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(hashTableCapacity, 0.75f, true){
                private static final long serialVersionUID = 1L;

                @Override
                protected boolean removeEldestEntry(Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
                    return this.size() > 10000;
                }
            });
        }

        private synchronized ClientDatanodeProtocol getDatanodeProxy(DatanodeInfo node, Configuration conf, int socketTimeout) throws IOException {
            if (this.proxy == null) {
                this.proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf, socketTimeout);
            }
            return this.proxy;
        }

        private synchronized void resetDatanodeProxy() {
            if (null != this.proxy) {
                RPC.stopProxy((Object)this.proxy);
                this.proxy = null;
            }
        }

        private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
            return this.cache.get(b);
        }

        private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
            this.cache.put(b, info);
        }

        private void removeBlockLocalPathInfo(ExtendedBlock b) {
            this.cache.remove(b);
        }
    }
}

