package org.apache.flume.client.avro;

import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.flume.FlumeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/client/avro/SpoolingFileLineReader.class */
public class SpoolingFileLineReader implements LineReader {
    private static final Logger logger = LoggerFactory.getLogger(SpoolingFileLineReader.class);
    private static final int OVERFLOW_LINE_PRINT_CHARS = 30;
    private File directory;
    public String completedSuffix;
    private int bufferMaxLines;
    private int bufferMaxLineLength;
    private Optional<FileInfo> currentFile = Optional.absent();
    private Optional<FileInfo> lastFileRead = Optional.absent();
    private boolean committed = true;
    private boolean disabled = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/client/avro/SpoolingFileLineReader$FileInfo.class */
    public class FileInfo {
        private File file;
        private long length;
        private long lastModified;
        private BufferedReader reader;

        public FileInfo(File file, BufferedReader bufferedReader) {
            this.file = file;
            this.length = file.length();
            this.lastModified = file.lastModified();
            this.reader = bufferedReader;
        }

        public long getLength() {
            return this.length;
        }

        public long getLastModified() {
            return this.lastModified;
        }

        public BufferedReader getReader() {
            return this.reader;
        }

        public File getFile() {
            return this.file;
        }
    }

    public SpoolingFileLineReader(File file, String str, int i, int i2) {
        Preconditions.checkNotNull(file);
        Preconditions.checkState(file.exists(), "Directory does not exist: " + file.getAbsolutePath());
        Preconditions.checkState(file.isDirectory(), "Path is not a directory: " + file.getAbsolutePath());
        Preconditions.checkState(i > 0);
        Preconditions.checkState(i2 > 0);
        try {
            File createTempFile = File.createTempFile("flume", "test", file);
            Files.write("testing flume file permissions\n", createTempFile, Charsets.UTF_8);
            Files.readLines(createTempFile, Charsets.UTF_8);
            createTempFile.delete();
            this.directory = file;
            this.completedSuffix = str;
            this.bufferMaxLines = i;
            this.bufferMaxLineLength = i2;
        } catch (IOException e) {
            throw new FlumeException("Unable to read and modify files in the spooling directory: " + file, e);
        }
    }

    public String getLastFileRead() {
        if (this.lastFileRead.isPresent()) {
            return ((FileInfo) this.lastFileRead.get()).getFile().getAbsolutePath();
        }
        return null;
    }

    @Override // org.apache.flume.client.avro.LineReader
    public String readLine() throws IOException {
        if (this.disabled) {
            throw new IllegalStateException("Reader has been disabled.");
        }
        List<String> readLines = readLines(1);
        if (readLines.size() == 0) {
            return null;
        }
        return readLines.get(0);
    }

    public void commit() throws IOException {
        if (this.disabled) {
            throw new IllegalStateException("Reader has been disabled.");
        }
        ((FileInfo) this.currentFile.get()).reader.mark(this.bufferMaxLines * this.bufferMaxLineLength);
        this.committed = true;
    }

    @Override // org.apache.flume.client.avro.LineReader
    public List<String> readLines(int i) throws IOException {
        if (this.disabled) {
            throw new IllegalStateException("Reader has been disabled.");
        }
        if (this.committed) {
            if (!this.currentFile.isPresent()) {
                this.currentFile = getNextFile();
            }
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
        } else {
            if (!this.currentFile.isPresent()) {
                throw new IllegalStateException("File should not roll when  commit is outstanding.");
            }
            logger.info("Last read was never comitted - resetting mark position.");
            ((FileInfo) this.currentFile.get()).getReader().reset();
        }
        String readLine = ((FileInfo) this.currentFile.get()).getReader().readLine();
        if (readLine == null) {
            retireCurrentFile();
            this.currentFile = getNextFile();
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
            readLine = ((FileInfo) this.currentFile.get()).getReader().readLine();
        }
        ArrayList newArrayList = Lists.newArrayList();
        while (readLine != null) {
            if (readLine.length() > this.bufferMaxLineLength) {
                logger.error("Found line longer than " + this.bufferMaxLineLength + " characters, cannot make progress.");
                logger.error("Invalid line starts with: " + readLine.substring(0, Math.min(OVERFLOW_LINE_PRINT_CHARS, readLine.length())));
                this.disabled = true;
                throw new FlumeException("Encoutered line that was too long.");
            }
            newArrayList.add(readLine);
            if (newArrayList.size() == i) {
                break;
            }
            readLine = ((FileInfo) this.currentFile.get()).getReader().readLine();
        }
        this.committed = false;
        this.lastFileRead = this.currentFile;
        return newArrayList;
    }

    private void retireCurrentFile() throws IOException {
        Preconditions.checkState(this.currentFile.isPresent());
        String absolutePath = ((FileInfo) this.currentFile.get()).getFile().getAbsolutePath();
        String str = absolutePath + this.completedSuffix;
        logger.info("Preparing to move file " + absolutePath + " to " + str);
        ((FileInfo) this.currentFile.get()).reader.close();
        File file = new File(absolutePath);
        if (file.lastModified() != ((FileInfo) this.currentFile.get()).getLastModified()) {
            throw new IllegalStateException("File has been modified since being read: " + absolutePath);
        }
        if (file.length() != ((FileInfo) this.currentFile.get()).getLength()) {
            throw new IllegalStateException("File has changed size since being read: " + absolutePath);
        }
        File file2 = new File(str);
        if (!file2.exists() || System.getProperty("os.name").toLowerCase().indexOf("win") < 0) {
            if (file2.exists()) {
                throw new IllegalStateException("File name has been re-used with different files. Spooling assumption violated for " + str);
            }
            if (!file.renameTo(new File(str))) {
                throw new FlumeException("Unable to move " + absolutePath + " to " + str + ". This will likely cause duplicate events. Please verify that flume has sufficient permissions to perform these operations.");
            }
            return;
        }
        if (!Files.equal(((FileInfo) this.currentFile.get()).getFile(), file2)) {
            throw new IllegalStateException("File name has been re-used with different files. Spooling assumption violated for " + str);
        }
        logger.warn("Completed file " + str + " already exists, but files match, so continuing.");
        if (file.delete()) {
            return;
        }
        logger.error("Unable to delete file " + file.getAbsolutePath() + ". It will likely be ingested another time.");
    }

    private Optional<FileInfo> getNextFile() {
        List asList = Arrays.asList(this.directory.listFiles(new FileFilter() { // from class: org.apache.flume.client.avro.SpoolingFileLineReader.1
            @Override // java.io.FileFilter
            public boolean accept(File file) {
                return (file.getName().endsWith(SpoolingFileLineReader.this.completedSuffix) || file.getName().startsWith(".")) ? false : true;
            }
        }));
        if (asList.isEmpty()) {
            return Optional.absent();
        }
        Collections.sort(asList, new Comparator<File>() { // from class: org.apache.flume.client.avro.SpoolingFileLineReader.2
            @Override // java.util.Comparator
            public int compare(File file, File file2) {
                int compareTo = new Long(file.lastModified()).compareTo(new Long(file2.lastModified()));
                return compareTo != 0 ? compareTo : file.getName().compareTo(file2.getName());
            }
        });
        File file = (File) asList.get(0);
        try {
            int i = this.bufferMaxLines * this.bufferMaxLineLength;
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file), i);
            bufferedReader.mark(i);
            return Optional.of(new FileInfo(file, bufferedReader));
        } catch (FileNotFoundException e) {
            logger.warn("Could not find file: " + file, e);
            return Optional.absent();
        } catch (IOException e2) {
            logger.error("Exception opening file: " + file, e2);
            return Optional.absent();
        }
    }

    @Override // org.apache.flume.client.avro.LineReader
    public void close() throws IOException {
    }
}
