/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Before;

public class TestFileConcurrentReader
extends TestCase {
    private static final Logger LOG = Logger.getLogger(TestFileConcurrentReader.class);
    static final long seed = 3735928559L;
    static final int blockSize = 8192;
    private static final int DEFAULT_WRITE_SIZE = 1025;
    private static final int SMALL_WRITE_SIZE = 61;
    private Configuration conf;
    private MiniDFSCluster cluster;
    private FileSystem fileSystem;

    public TestFileConcurrentReader() {
        ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
        ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
    }

    private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) throws IOException {
        System.out.println("createFile: Created " + name + " with " + repl + " replica.");
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 8192L);
        return stm;
    }

    @Before
    protected void setUp() throws IOException {
        this.conf = new Configuration();
        this.init(this.conf);
    }

    private void init(Configuration conf) throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
        this.cluster = new MiniDFSCluster(conf, 1, true, null);
        this.cluster.waitClusterUp();
        this.fileSystem = this.cluster.getFileSystem();
    }

    private void writeFileAndSync(FSDataOutputStream stm, int size) throws IOException {
        byte[] buffer = this.generateSequentialBytes(0, size);
        stm.write(buffer, 0, size);
        stm.sync();
    }

    private void checkCanRead(FileSystem fileSys, Path path, int numBytes) throws IOException {
        this.waitForBlocks(fileSys, path);
        this.assertBytesAvailable(fileSys, path, numBytes);
    }

    private void assertBytesAvailable(FileSystem fileSystem, Path path, int numBytes) throws IOException {
        byte[] buffer = new byte[numBytes];
        FSDataInputStream inputStream = fileSystem.open(path);
        IOUtils.readFully((InputStream)inputStream, (byte[])buffer, (int)0, (int)numBytes);
        TestFileConcurrentReader.assertTrue((String)"unable to validate bytes", (boolean)this.validateSequentialBytes(buffer, 0, numBytes));
    }

    private void waitForBlocks(FileSystem fileSys, Path name) throws IOException {
        boolean done = false;
        while (!done) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            done = true;
            BlockLocation[] locations = fileSys.getFileBlockLocations(fileSys.getFileStatus(name), 0L, 8192L);
            if (locations.length >= 1) continue;
            done = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testUnfinishedBlockRead() throws IOException {
        try {
            Path path = new Path("/");
            System.out.println("Path : \"" + path.toString() + "\"");
            System.out.println(this.fileSystem.getFileStatus(path).isDir());
            TestFileConcurrentReader.assertTrue((String)"/ should be a directory", (boolean)this.fileSystem.getFileStatus(path).isDir());
            Path file1 = new Path("/unfinished-block");
            FSDataOutputStream stm = TestFileCreation.createFile(this.fileSystem, file1, 1);
            int partialBlockSize = 4096;
            this.writeFileAndSync(stm, partialBlockSize);
            this.checkCanRead(this.fileSystem, file1, partialBlockSize);
            stm.close();
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
        try {
            Path path = new Path("/");
            System.out.println("Path : \"" + path.toString() + "\"");
            System.out.println(this.fileSystem.getFileStatus(path).isDir());
            TestFileConcurrentReader.assertTrue((String)"/ should be a directory", (boolean)this.fileSystem.getFileStatus(path).isDir());
            Path file1 = new Path("/unfinished-block");
            FSDataOutputStream stm = TestFileCreation.createFile(this.fileSystem, file1, 1);
            int bytesPerChecksum = this.conf.getInt("io.bytes.per.checksum", 512);
            int partialBlockSize = bytesPerChecksum - 1;
            this.writeFileAndSync(stm, partialBlockSize);
            this.checkCanRead(this.fileSystem, file1, partialBlockSize);
            stm.close();
        }
        finally {
            this.cluster.shutdown();
        }
    }

    public void testUnfinishedBlockCRCErrorTransferTo() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.SYNC, 1025);
    }

    public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.SYNC, 61);
    }

    public void _testUnfinishedBlockCRCErrorTransferToAppend() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.APPEND, 1025);
    }

    public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.SYNC, 1025);
    }

    public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.SYNC, 61);
    }

    public void _testUnfinishedBlockCRCErrorNormalTransferAppend() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.APPEND, 1025);
    }

    private void runTestUnfinishedBlockCRCError(boolean transferToAllowed, SyncType syncType, int writeSize) throws IOException {
        this.runTestUnfinishedBlockCRCError(transferToAllowed, syncType, writeSize, new Configuration());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runTestUnfinishedBlockCRCError(boolean transferToAllowed, final SyncType syncType, final int writeSize, Configuration conf) throws IOException {
        try {
            conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND);
            conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed);
            this.init(conf);
            final Path file = new Path("/block-being-written-to");
            int numWrites = 2000;
            final AtomicBoolean writerDone = new AtomicBoolean(false);
            final AtomicBoolean writerStarted = new AtomicBoolean(false);
            final AtomicBoolean error = new AtomicBoolean(false);
            final FSDataOutputStream initialOutputStream = this.fileSystem.create(file);
            Thread writer = new Thread(new Runnable(){
                private FSDataOutputStream outputStream;
                {
                    this.outputStream = initialOutputStream;
                }

                @Override
                public void run() {
                    try {
                        for (int i = 0; !error.get() && i < 2000; ++i) {
                            try {
                                byte[] writeBuf = TestFileConcurrentReader.this.generateSequentialBytes(i * writeSize, writeSize);
                                this.outputStream.write(writeBuf);
                                if (syncType == SyncType.SYNC) {
                                    this.outputStream.sync();
                                } else {
                                    this.outputStream.close();
                                    this.outputStream = TestFileConcurrentReader.this.fileSystem.append(file);
                                }
                                writerStarted.set(true);
                                continue;
                            }
                            catch (IOException e) {
                                error.set(true);
                                LOG.error((Object)String.format("error writing to file", new Object[0]));
                            }
                        }
                        this.outputStream.close();
                        writerDone.set(true);
                    }
                    catch (Exception e) {
                        LOG.error((Object)"error in writer", (Throwable)e);
                        throw new RuntimeException(e);
                    }
                }
            });
            Thread tailer = new Thread(new Runnable(){

                /*
                 * Enabled force condition propagation
                 * Lifted jumps to return sites
                 */
                @Override
                public void run() {
                    try {
                        long startPos = 0L;
                        while (!writerDone.get() && !error.get()) {
                            if (!writerStarted.get()) continue;
                            try {
                                startPos = TestFileConcurrentReader.this.tailFile(file, startPos);
                            }
                            catch (IOException e) {
                                LOG.error((Object)String.format("error tailing file %s", file), (Throwable)e);
                                throw new RuntimeException(e);
                                return;
                            }
                        }
                    }
                    catch (RuntimeException e) {
                        if (e.getCause() instanceof ChecksumException) {
                            error.set(true);
                        }
                        LOG.error((Object)"error in tailer", (Throwable)e);
                        throw e;
                    }
                }
            });
            writer.start();
            tailer.start();
            try {
                writer.join();
                tailer.join();
                TestFileConcurrentReader.assertFalse((String)"error occurred, see log above", (boolean)error.get());
            }
            catch (InterruptedException e) {
                LOG.info((Object)"interrupted waiting for writer or tailer to complete");
                Thread.currentThread().interrupt();
            }
            initialOutputStream.close();
        }
        finally {
            this.cluster.shutdown();
        }
    }

    private byte[] generateSequentialBytes(int start, int length) {
        byte[] result = new byte[length];
        for (int i = 0; i < length; ++i) {
            result[i] = (byte)((start + i) % 127);
        }
        return result;
    }

    private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {
        for (int i = 0; i < len; ++i) {
            int expected = (i + startPos) % 127;
            if (buf[i] % 127 == expected) continue;
            LOG.error((Object)String.format("at position [%d], got [%d] and expected [%d]", startPos, buf[i], expected));
            return false;
        }
        return true;
    }

    private long tailFile(Path file, long startPos) throws IOException {
        int read;
        long numRead = 0L;
        FSDataInputStream inputStream = this.fileSystem.open(file);
        inputStream.seek(startPos);
        int len = 4096;
        byte[] buf = new byte[len];
        while ((read = inputStream.read(buf)) > -1) {
            LOG.info((Object)String.format("read %d bytes", read));
            if (!this.validateSequentialBytes(buf, (int)(startPos + numRead), read)) {
                LOG.error((Object)String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
                throw new ChecksumException(String.format("unable to validate bytes", new Object[0]), startPos);
            }
            numRead += (long)read;
        }
        inputStream.close();
        return numRead + startPos - 1L;
    }

    public void _testClientReadsPastBlockEnd() throws IOException {
    }

    private static enum SyncType {
        SYNC,
        APPEND;

    }
}

