/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source.taildir;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.stream.JsonReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.source.taildir.TailFile;
import org.apache.flume.source.taildir.TaildirMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReliableTaildirEventReader
implements ReliableEventReader {
    private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
    private final List<TaildirMatcher> taildirCache;
    private final Table<String, String, String> headerTable;
    private TailFile currentFile = null;
    private Map<Long, TailFile> tailFiles = Maps.newHashMap();
    private long updateTime;
    private boolean addByteOffset;
    private boolean cachePatternMatching;
    private boolean committed = true;
    private final boolean annotateFileName;
    private final String fileNameHeader;

    private ReliableTaildirEventReader(Map<String, String> filePaths, Table<String, String, String> headerTable, String positionFilePath, boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, boolean annotateFileName, String fileNameHeader) throws IOException {
        Preconditions.checkNotNull(filePaths);
        Preconditions.checkNotNull((Object)positionFilePath);
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing {} with directory={}, metaDir={}", new Object[]{ReliableTaildirEventReader.class.getSimpleName(), filePaths});
        }
        ArrayList taildirCache = Lists.newArrayList();
        for (Map.Entry<String, String> e : filePaths.entrySet()) {
            taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching));
        }
        logger.info("taildirCache: " + ((Object)taildirCache).toString());
        logger.info("headerTable: " + headerTable.toString());
        this.taildirCache = taildirCache;
        this.headerTable = headerTable;
        this.addByteOffset = addByteOffset;
        this.cachePatternMatching = cachePatternMatching;
        this.annotateFileName = annotateFileName;
        this.fileNameHeader = fileNameHeader;
        this.updateTailFiles(skipToEnd);
        logger.info("Updating position from position file: " + positionFilePath);
        this.loadPositionFile(positionFilePath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Could not resolve type clashes
     */
    public void loadPositionFile(String filePath) {
        FileReader fr = null;
        JsonReader jr = null;
        try {
            fr = new FileReader(filePath);
            jr = new JsonReader((Reader)fr);
            jr.beginArray();
            while (jr.hasNext()) {
                Long inode = null;
                Long pos = null;
                String path = null;
                jr.beginObject();
                while (jr.hasNext()) {
                    switch (jr.nextName()) {
                        case "inode": {
                            inode = jr.nextLong();
                            break;
                        }
                        case "pos": {
                            pos = jr.nextLong();
                            break;
                        }
                        case "file": {
                            path = jr.nextString();
                        }
                    }
                }
                jr.endObject();
                for (Object v : Arrays.asList(inode, pos, path)) {
                    Preconditions.checkNotNull(v, (Object)("Detected missing value in position file. inode: " + inode + ", pos: " + pos + ", path: " + path));
                }
                TailFile tf = this.tailFiles.get(inode);
                if (tf != null && tf.updatePos(path, inode, pos)) {
                    this.tailFiles.put(inode, tf);
                    continue;
                }
                logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
            }
            jr.endArray();
        }
        catch (FileNotFoundException e) {
            logger.info("File not found: " + filePath + ", not updating position");
        }
        catch (IOException e) {
            logger.error("Failed loading positionFile: " + filePath, (Throwable)e);
        }
        finally {
            try {
                if (fr != null) {
                    fr.close();
                }
                if (jr != null) {
                    jr.close();
                }
            }
            catch (IOException e) {
                logger.error("Error: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    public Map<Long, TailFile> getTailFiles() {
        return this.tailFiles;
    }

    public void setCurrentFile(TailFile currentFile) {
        this.currentFile = currentFile;
    }

    public Event readEvent() throws IOException {
        List<Event> events = this.readEvents(1);
        if (events.isEmpty()) {
            return null;
        }
        return events.get(0);
    }

    public List<Event> readEvents(int numEvents) throws IOException {
        return this.readEvents(numEvents, false);
    }

    @VisibleForTesting
    public List<Event> readEvents(TailFile tf, int numEvents) throws IOException {
        this.setCurrentFile(tf);
        return this.readEvents(numEvents, true);
    }

    public List<Event> readEvents(int numEvents, boolean backoffWithoutNL) throws IOException {
        List<Event> events;
        if (!this.committed) {
            if (this.currentFile == null) {
                throw new IllegalStateException("current file does not exist. " + this.currentFile.getPath());
            }
            logger.info("Last read was never committed - resetting position");
            long lastPos = this.currentFile.getPos();
            this.currentFile.updateFilePos(lastPos);
        }
        if ((events = this.currentFile.readEvents(numEvents, backoffWithoutNL, this.addByteOffset)).isEmpty()) {
            return events;
        }
        Map<String, String> headers = this.currentFile.getHeaders();
        if (this.annotateFileName || headers != null && !headers.isEmpty()) {
            for (Event event : events) {
                if (headers != null && !headers.isEmpty()) {
                    event.getHeaders().putAll(headers);
                }
                if (!this.annotateFileName) continue;
                event.getHeaders().put(this.fileNameHeader, this.currentFile.getPath());
            }
        }
        this.committed = false;
        return events;
    }

    public void close() throws IOException {
        for (TailFile tf : this.tailFiles.values()) {
            if (tf.getRaf() == null) continue;
            tf.getRaf().close();
        }
    }

    public void commit() throws IOException {
        if (!this.committed && this.currentFile != null) {
            long pos = this.currentFile.getLineReadPos();
            this.currentFile.setPos(pos);
            this.currentFile.setLastUpdated(this.updateTime);
            this.committed = true;
        }
    }

    public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
        this.updateTime = System.currentTimeMillis();
        ArrayList updatedInodes = Lists.newArrayList();
        for (TaildirMatcher taildir : this.taildirCache) {
            Map headers = this.headerTable.row((Object)taildir.getFileGroup());
            for (File f : taildir.getMatchingFiles()) {
                long inode;
                try {
                    inode = this.getInode(f);
                }
                catch (NoSuchFileException e) {
                    logger.info("File has been deleted in the meantime: " + e.getMessage());
                    continue;
                }
                TailFile tf = this.tailFiles.get(inode);
                if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
                    long startPos = skipToEnd ? f.length() : 0L;
                    tf = this.openFile(f, headers, inode, startPos);
                } else {
                    boolean updated;
                    boolean bl = updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
                    if (updated) {
                        if (tf.getRaf() == null) {
                            tf = this.openFile(f, headers, inode, tf.getPos());
                        }
                        if (f.length() < tf.getPos()) {
                            logger.info("Pos " + tf.getPos() + " is larger than file size! Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
                            tf.updatePos(tf.getPath(), inode, 0L);
                        }
                    }
                    tf.setNeedTail(updated);
                }
                this.tailFiles.put(inode, tf);
                updatedInodes.add(inode);
            }
        }
        return updatedInodes;
    }

    public List<Long> updateTailFiles() throws IOException {
        return this.updateTailFiles(false);
    }

    private long getInode(File file) throws IOException {
        long inode = (Long)Files.getAttribute(file.toPath(), "unix:ino", new LinkOption[0]);
        return inode;
    }

    private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
        try {
            logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
            return new TailFile(file, headers, inode, pos);
        }
        catch (IOException e) {
            throw new FlumeException("Failed opening file: " + file, (Throwable)e);
        }
    }

    public static class Builder {
        private Map<String, String> filePaths;
        private Table<String, String, String> headerTable;
        private String positionFilePath;
        private boolean skipToEnd;
        private boolean addByteOffset;
        private boolean cachePatternMatching;
        private Boolean annotateFileName = false;
        private String fileNameHeader = "file";

        public Builder filePaths(Map<String, String> filePaths) {
            this.filePaths = filePaths;
            return this;
        }

        public Builder headerTable(Table<String, String, String> headerTable) {
            this.headerTable = headerTable;
            return this;
        }

        public Builder positionFilePath(String positionFilePath) {
            this.positionFilePath = positionFilePath;
            return this;
        }

        public Builder skipToEnd(boolean skipToEnd) {
            this.skipToEnd = skipToEnd;
            return this;
        }

        public Builder addByteOffset(boolean addByteOffset) {
            this.addByteOffset = addByteOffset;
            return this;
        }

        public Builder cachePatternMatching(boolean cachePatternMatching) {
            this.cachePatternMatching = cachePatternMatching;
            return this;
        }

        public Builder annotateFileName(boolean annotateFileName) {
            this.annotateFileName = annotateFileName;
            return this;
        }

        public Builder fileNameHeader(String fileNameHeader) {
            this.fileNameHeader = fileNameHeader;
            return this;
        }

        public ReliableTaildirEventReader build() throws IOException {
            return new ReliableTaildirEventReader(this.filePaths, this.headerTable, this.positionFilePath, this.skipToEnd, this.addByteOffset, this.cachePatternMatching, this.annotateFileName, this.fileNameHeader);
        }
    }
}

