/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source.taildir;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.taildir.ReliableTaildirEventReader;
import org.apache.flume.source.taildir.TailFile;
import org.apache.flume.source.taildir.TaildirSourceConfigurationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaildirSource
extends AbstractSource
implements PollableSource,
Configurable,
BatchSizeSupported {
    private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class);
    private Map<String, String> filePaths;
    private Table<String, String, String> headerTable;
    private int batchSize;
    private String positionFilePath;
    private boolean skipToEnd;
    private boolean byteOffsetHeader;
    private SourceCounter sourceCounter;
    private ReliableTaildirEventReader reader;
    private ScheduledExecutorService idleFileChecker;
    private ScheduledExecutorService positionWriter;
    private int retryInterval = 1000;
    private int maxRetryInterval = 5000;
    private int idleTimeout;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private int writePosInterval;
    private boolean cachePatternMatching;
    private List<Long> existingInodes = new CopyOnWriteArrayList<Long>();
    private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
    private Long backoffSleepIncrement;
    private Long maxBackOffSleepInterval;
    private boolean fileHeader;
    private String fileHeaderKey;
    private Long maxBatchCount;

    public synchronized void start() {
        logger.info("{} TaildirSource source starting with directory: {}", (Object)this.getName(), this.filePaths);
        try {
            this.reader = new ReliableTaildirEventReader.Builder().filePaths(this.filePaths).headerTable(this.headerTable).positionFilePath(this.positionFilePath).skipToEnd(this.skipToEnd).addByteOffset(this.byteOffsetHeader).cachePatternMatching(this.cachePatternMatching).annotateFileName(this.fileHeader).fileNameHeader(this.fileHeaderKey).build();
        }
        catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader", (Throwable)e);
        }
        this.idleFileChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
        this.idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), this.idleTimeout, this.checkIdleInterval, TimeUnit.MILLISECONDS);
        this.positionWriter = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
        this.positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), this.writePosInitDelay, this.writePosInterval, TimeUnit.MILLISECONDS);
        super.start();
        logger.debug("TaildirSource started");
        this.sourceCounter.start();
    }

    public synchronized void stop() {
        try {
            ExecutorService[] services;
            super.stop();
            for (ExecutorService service : services = new ExecutorService[]{this.idleFileChecker, this.positionWriter}) {
                service.shutdown();
                if (service.awaitTermination(1L, TimeUnit.SECONDS)) continue;
                service.shutdownNow();
            }
            this.writePosition();
            this.reader.close();
        }
        catch (InterruptedException e) {
            logger.info("Interrupted while awaiting termination", (Throwable)e);
        }
        catch (IOException e) {
            logger.info("Failed: " + e.getMessage(), (Throwable)e);
        }
        this.sourceCounter.stop();
        logger.info("Taildir source {} stopped. Metrics: {}", (Object)this.getName(), (Object)this.sourceCounter);
    }

    public String toString() {
        return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", this.positionFilePath, this.skipToEnd, this.byteOffsetHeader, this.idleTimeout, this.writePosInterval);
    }

    public synchronized void configure(Context context) {
        String fileGroups = context.getString("filegroups");
        Preconditions.checkState((fileGroups != null ? 1 : 0) != 0, (Object)"Missing param: filegroups");
        this.filePaths = this.selectByKeys((Map<String, String>)context.getSubProperties("filegroups."), fileGroups.split("\\s+"));
        Preconditions.checkState((!this.filePaths.isEmpty() ? 1 : 0) != 0, (Object)"Mapping for tailing files is empty or invalid: 'filegroups.'");
        String homePath = System.getProperty("user.home").replace('\\', '/');
        this.positionFilePath = context.getString("positionFile", homePath + "/.flume/taildir_position.json");
        Path positionFile = Paths.get(this.positionFilePath, new String[0]);
        try {
            Files.createDirectories(positionFile.getParent(), new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new FlumeException("Error creating positionFile parent directories", (Throwable)e);
        }
        this.headerTable = this.getTable(context, "headers.");
        this.batchSize = context.getInteger("batchSize", Integer.valueOf(100));
        this.skipToEnd = context.getBoolean("skipToEnd", Boolean.valueOf(false));
        this.byteOffsetHeader = context.getBoolean("byteOffsetHeader", Boolean.valueOf(false));
        this.idleTimeout = context.getInteger("idleTimeout", Integer.valueOf(120000));
        this.writePosInterval = context.getInteger("writePosInterval", Integer.valueOf(3000));
        this.cachePatternMatching = context.getBoolean("cachePatternMatching", Boolean.valueOf(true));
        this.backoffSleepIncrement = context.getLong("backoffSleepIncrement", Long.valueOf(1000L));
        this.maxBackOffSleepInterval = context.getLong("maxBackoffSleep", Long.valueOf(5000L));
        this.fileHeader = context.getBoolean("fileHeader", Boolean.valueOf(false));
        this.fileHeaderKey = context.getString("fileHeaderKey", "file");
        this.maxBatchCount = context.getLong("maxBatchCount", TaildirSourceConfigurationConstants.DEFAULT_MAX_BATCH_COUNT);
        if (this.maxBatchCount <= 0L) {
            this.maxBatchCount = TaildirSourceConfigurationConstants.DEFAULT_MAX_BATCH_COUNT;
            logger.warn("Invalid maxBatchCount specified, initializing source default maxBatchCount of {}", (Object)this.maxBatchCount);
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
    }

    public long getBatchSize() {
        return this.batchSize;
    }

    private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) {
        HashMap result = Maps.newHashMap();
        for (String key : keys) {
            if (!map.containsKey(key)) continue;
            result.put(key, map.get(key));
        }
        return result;
    }

    private Table<String, String, String> getTable(Context context, String prefix) {
        HashBasedTable table = HashBasedTable.create();
        for (Map.Entry e : context.getSubProperties(prefix).entrySet()) {
            String[] parts = ((String)e.getKey()).split("\\.", 2);
            table.put((Object)parts[0], (Object)parts[1], (Object)((String)e.getValue()));
        }
        return table;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }

    public PollableSource.Status process() {
        PollableSource.Status status = PollableSource.Status.BACKOFF;
        try {
            this.existingInodes.clear();
            this.existingInodes.addAll(this.reader.updateTailFiles());
            for (long inode : this.existingInodes) {
                boolean hasMoreLines;
                TailFile tf = this.reader.getTailFiles().get(inode);
                if (!tf.needTail() || !(hasMoreLines = this.tailFileProcess(tf, true))) continue;
                status = PollableSource.Status.READY;
            }
            this.closeTailFiles();
        }
        catch (Throwable t) {
            logger.error("Unable to tail files", t);
            this.sourceCounter.incrementEventReadFail();
            status = PollableSource.Status.BACKOFF;
        }
        return status;
    }

    public long getBackOffSleepIncrement() {
        return this.backoffSleepIncrement;
    }

    public long getMaxBackOffSleepInterval() {
        return this.maxBackOffSleepInterval;
    }

    private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException {
        long batchCount = 0L;
        while (true) {
            this.reader.setCurrentFile(tf);
            List<Event> events = this.reader.readEvents(this.batchSize, backoffWithoutNL);
            if (events.isEmpty()) {
                return false;
            }
            this.sourceCounter.addToEventReceivedCount((long)events.size());
            this.sourceCounter.incrementAppendBatchReceivedCount();
            try {
                this.getChannelProcessor().processEventBatch(events);
                this.reader.commit();
            }
            catch (ChannelException ex) {
                logger.warn("The channel is full or unexpected failure. The source will try again after " + this.retryInterval + " ms");
                this.sourceCounter.incrementChannelWriteFail();
                TimeUnit.MILLISECONDS.sleep(this.retryInterval);
                this.retryInterval <<= 1;
                this.retryInterval = Math.min(this.retryInterval, this.maxRetryInterval);
                continue;
            }
            this.retryInterval = 1000;
            this.sourceCounter.addToEventAcceptedCount((long)events.size());
            this.sourceCounter.incrementAppendBatchAcceptedCount();
            if (events.size() < this.batchSize) {
                logger.debug("The events taken from " + tf.getPath() + " is less than " + this.batchSize);
                return false;
            }
            if (++batchCount >= this.maxBatchCount) break;
        }
        logger.debug("The batches read from the same file is larger than " + this.maxBatchCount);
        return true;
    }

    private void closeTailFiles() throws IOException, InterruptedException {
        for (long inode : this.idleInodes) {
            TailFile tf = this.reader.getTailFiles().get(inode);
            if (tf.getRaf() == null) continue;
            this.tailFileProcess(tf, false);
            tf.close();
            logger.info("Closed file: " + tf.getPath() + ", inode: " + inode + ", pos: " + tf.getPos());
        }
        this.idleInodes.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writePosition() {
        File file = new File(this.positionFilePath);
        FileWriter writer = null;
        try {
            writer = new FileWriter(file);
            if (!this.existingInodes.isEmpty()) {
                String json = this.toPosInfoJson();
                writer.write(json);
            }
        }
        catch (Throwable t) {
            logger.error("Failed writing positionFile", t);
            this.sourceCounter.incrementGenericProcessingFail();
        }
        finally {
            try {
                if (writer != null) {
                    writer.close();
                }
            }
            catch (IOException e) {
                logger.error("Error: " + e.getMessage(), (Throwable)e);
                this.sourceCounter.incrementGenericProcessingFail();
            }
        }
    }

    private String toPosInfoJson() {
        ArrayList posInfos = Lists.newArrayList();
        for (Long inode : this.existingInodes) {
            TailFile tf = this.reader.getTailFiles().get(inode);
            posInfos.add(ImmutableMap.of((Object)"inode", (Object)inode, (Object)"pos", (Object)tf.getPos(), (Object)"file", (Object)tf.getPath()));
        }
        return new Gson().toJson((Object)posInfos);
    }

    private class PositionWriterRunnable
    implements Runnable {
        private PositionWriterRunnable() {
        }

        @Override
        public void run() {
            TaildirSource.this.writePosition();
        }
    }

    private class idleFileCheckerRunnable
    implements Runnable {
        private idleFileCheckerRunnable() {
        }

        @Override
        public void run() {
            try {
                long now = System.currentTimeMillis();
                for (TailFile tf : TaildirSource.this.reader.getTailFiles().values()) {
                    if (tf.getLastUpdated() + (long)TaildirSource.this.idleTimeout >= now || tf.getRaf() == null) continue;
                    TaildirSource.this.idleInodes.add(tf.getInode());
                }
            }
            catch (Throwable t) {
                logger.error("Uncaught exception in IdleFileChecker thread", t);
                TaildirSource.this.sourceCounter.incrementGenericProcessingFail();
            }
        }
    }
}

