package org.apache.flume.source;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.serialization.LineDeserializer;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.annotations.VisibleForTesting;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.base.Throwables;

/* loaded from: input_file:org/apache/flume/source/SpoolDirectorySource.class */
public class SpoolDirectorySource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
    private static final int POLL_DELAY_MS = 500;
    private String completedSuffix;
    private String spoolDirectory;
    private boolean fileHeader;
    private String fileHeaderKey;
    private boolean basenameHeader;
    private String basenameHeaderKey;
    private int batchSize;
    private String ignorePattern;
    private String trackerDirPath;
    private String deserializerType;
    private Context deserializerContext;
    private String deletePolicy;
    private String inputCharset;
    private DecodeErrorPolicy decodeErrorPolicy;
    private SourceCounter sourceCounter;
    ReliableSpoolingFileEventReader reader;
    private ScheduledExecutorService executor;
    private int maxBackoff;
    private SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder;
    private volatile boolean hasFatalError = false;
    private boolean backoff = true;
    private boolean hitChannelException = false;

    /* loaded from: input_file:org/apache/flume/source/SpoolDirectorySource$SpoolDirectoryRunnable.class */
    private class SpoolDirectoryRunnable implements Runnable {
        private ReliableSpoolingFileEventReader reader;
        private SourceCounter sourceCounter;

        public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reliableSpoolingFileEventReader, SourceCounter sourceCounter) {
            this.reader = reliableSpoolingFileEventReader;
            this.sourceCounter = sourceCounter;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 250;
            while (!Thread.interrupted()) {
                try {
                    List<Event> readEvents = this.reader.readEvents(SpoolDirectorySource.this.batchSize);
                    if (readEvents.isEmpty()) {
                        break;
                    }
                    this.sourceCounter.addToEventReceivedCount(readEvents.size());
                    this.sourceCounter.incrementAppendBatchReceivedCount();
                    try {
                        SpoolDirectorySource.this.getChannelProcessor().processEventBatch(readEvents);
                        this.reader.commit();
                        i = 250;
                        this.sourceCounter.addToEventAcceptedCount(readEvents.size());
                        this.sourceCounter.incrementAppendBatchAcceptedCount();
                    } catch (ChannelException e) {
                        SpoolDirectorySource.logger.warn("The channel is full, and cannot write data now. The source will try again after " + String.valueOf(i) + " milliseconds");
                        SpoolDirectorySource.this.hitChannelException = true;
                        if (SpoolDirectorySource.this.backoff) {
                            TimeUnit.MILLISECONDS.sleep(i);
                            int i2 = i << 1;
                            i = i2 >= SpoolDirectorySource.this.maxBackoff ? SpoolDirectorySource.this.maxBackoff : i2;
                        }
                    }
                } catch (Throwable th) {
                    SpoolDirectorySource.logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.", th);
                    SpoolDirectorySource.this.hasFatalError = true;
                    Throwables.propagate(th);
                    return;
                }
            }
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public synchronized void start() {
        logger.info("SpoolDirectorySource source starting with directory: {}", this.spoolDirectory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        try {
            this.reader = new ReliableSpoolingFileEventReader.Builder().spoolDirectory(new File(this.spoolDirectory)).completedSuffix(this.completedSuffix).ignorePattern(this.ignorePattern).trackerDirPath(this.trackerDirPath).annotateFileName(Boolean.valueOf(this.fileHeader)).fileNameHeader(this.fileHeaderKey).annotateBaseName(Boolean.valueOf(this.basenameHeader)).baseNameHeader(this.basenameHeaderKey).deserializerType(this.deserializerType).deserializerContext(this.deserializerContext).deletePolicy(this.deletePolicy).inputCharset(this.inputCharset).decodeErrorPolicy(this.decodeErrorPolicy).consumeOrder(this.consumeOrder).build();
            this.executor.scheduleWithFixedDelay(new SpoolDirectoryRunnable(this.reader, this.sourceCounter), 0L, 500L, TimeUnit.MILLISECONDS);
            super.start();
            logger.debug("SpoolDirectorySource source started");
            this.sourceCounter.start();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating spooling event parser", e);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public synchronized void stop() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.info("Interrupted while awaiting termination", e);
        }
        this.executor.shutdownNow();
        super.stop();
        this.sourceCounter.stop();
        logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    @Override // org.apache.flume.source.AbstractSource
    public String toString() {
        return "Spool Directory source " + getName() + ": { spoolDir: " + this.spoolDirectory + " }";
    }

    @Override // org.apache.flume.conf.Configurable
    public synchronized void configure(Context context) {
        this.spoolDirectory = context.getString(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY);
        Preconditions.checkState(this.spoolDirectory != null, "Configuration must specify a spooling directory");
        this.completedSuffix = context.getString(SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX, SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX);
        this.deletePolicy = context.getString(SpoolDirectorySourceConfigurationConstants.DELETE_POLICY, SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY);
        this.fileHeader = context.getBoolean(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, false).booleanValue();
        this.fileHeaderKey = context.getString(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY, SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
        this.basenameHeader = context.getBoolean(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER, false).booleanValue();
        this.basenameHeaderKey = context.getString(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER_KEY, SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY);
        this.batchSize = context.getInteger("batchSize", 100).intValue();
        this.inputCharset = context.getString(SpoolDirectorySourceConfigurationConstants.INPUT_CHARSET, "UTF-8");
        this.decodeErrorPolicy = DecodeErrorPolicy.valueOf(context.getString(SpoolDirectorySourceConfigurationConstants.DECODE_ERROR_POLICY, SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY).toUpperCase(Locale.ENGLISH));
        this.ignorePattern = context.getString(SpoolDirectorySourceConfigurationConstants.IGNORE_PAT, SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT);
        this.trackerDirPath = context.getString(SpoolDirectorySourceConfigurationConstants.TRACKER_DIR, SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR);
        this.deserializerType = context.getString(SpoolDirectorySourceConfigurationConstants.DESERIALIZER, SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER);
        this.deserializerContext = new Context(context.getSubProperties("deserializer."));
        this.consumeOrder = SpoolDirectorySourceConfigurationConstants.ConsumeOrder.valueOf(context.getString(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH));
        Integer integer = context.getInteger(SpoolDirectorySourceConfigurationConstants.BUFFER_MAX_LINE_LENGTH);
        if (integer != null && this.deserializerType != null && this.deserializerType.equalsIgnoreCase(SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER)) {
            this.deserializerContext.put(LineDeserializer.MAXLINE_KEY, integer.toString());
        }
        this.maxBackoff = context.getInteger("maxBackoff", SpoolDirectorySourceConfigurationConstants.DEFAULT_MAX_BACKOFF).intValue();
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    @VisibleForTesting
    protected boolean hasFatalError() {
        return this.hasFatalError;
    }

    @VisibleForTesting
    protected void setBackOff(boolean z) {
        this.backoff = z;
    }

    @VisibleForTesting
    protected boolean hitChannelException() {
        return this.hitChannelException;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }
}
