package org.apache.flume.sink;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.PathManager;
import org.apache.flume.formatter.output.PathManagerFactory;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/RollingFileSink.class */
public class RollingFileSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(RollingFileSink.class);
    private static final long defaultRollInterval = 30;
    private static final int defaultBatchSize = 100;
    private File directory;
    private long rollInterval;
    private OutputStream outputStream;
    private ScheduledExecutorService rollService;
    private String serializerType;
    private Context serializerContext;
    private EventSerializer serializer;
    private SinkCounter sinkCounter;
    private PathManager pathController;
    private int batchSize = 100;
    private volatile boolean shouldRotate = false;

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        String string = context.getString("sink.pathManager", "DEFAULT");
        String string2 = context.getString("sink.directory");
        String string3 = context.getString("sink.rollInterval");
        this.serializerType = context.getString("sink.serializer", "TEXT");
        this.serializerContext = new Context(context.getSubProperties("sink.serializer."));
        this.pathController = PathManagerFactory.getInstance(string, new Context(context.getSubProperties("sink.pathManager.")));
        Preconditions.checkArgument(string2 != null, "Directory may not be null");
        Preconditions.checkNotNull(this.serializerType, "Serializer type is undefined");
        if (string3 == null) {
            this.rollInterval = defaultRollInterval;
        } else {
            this.rollInterval = Long.parseLong(string3);
        }
        this.batchSize = context.getInteger("sink.batchSize", 100).intValue();
        this.directory = new File(string2);
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Starting {}...", this);
        this.sinkCounter.start();
        super.start();
        this.pathController.setBaseDirectory(this.directory);
        if (this.rollInterval > 0) {
            this.rollService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d").build());
            this.rollService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.flume.sink.RollingFileSink.1
                @Override // java.lang.Runnable
                public void run() {
                    RollingFileSink.logger.debug("Marking time to rotate file {}", RollingFileSink.this.pathController.getCurrentFile());
                    RollingFileSink.this.shouldRotate = true;
                }
            }, this.rollInterval, this.rollInterval, TimeUnit.SECONDS);
        } else {
            logger.info("RollInterval is not valid, file rolling will not happen.");
        }
        logger.info("RollingFileSink {} started.", getName());
    }

    @Override // org.apache.flume.Sink
    public Sink.Status process() throws EventDeliveryException {
        if (this.shouldRotate) {
            logger.debug("Time to rotate {}", this.pathController.getCurrentFile());
            if (this.outputStream != null) {
                logger.debug("Closing file {}", this.pathController.getCurrentFile());
                try {
                    try {
                        this.serializer.flush();
                        this.serializer.beforeClose();
                        this.outputStream.close();
                        this.sinkCounter.incrementConnectionClosedCount();
                        this.shouldRotate = false;
                        this.serializer = null;
                        this.outputStream = null;
                        this.pathController.rotate();
                    } catch (IOException e) {
                        this.sinkCounter.incrementConnectionFailedCount();
                        throw new EventDeliveryException("Unable to rotate file " + this.pathController.getCurrentFile() + " while delivering event", e);
                    }
                } catch (Throwable th) {
                    this.serializer = null;
                    this.outputStream = null;
                    throw th;
                }
            }
        }
        if (this.outputStream == null) {
            File currentFile = this.pathController.getCurrentFile();
            logger.debug("Opening output stream for file {}", currentFile);
            try {
                this.outputStream = new BufferedOutputStream(new FileOutputStream(currentFile));
                this.serializer = EventSerializerFactory.getInstance(this.serializerType, this.serializerContext, this.outputStream);
                this.serializer.afterCreate();
                this.sinkCounter.incrementConnectionCreatedCount();
            } catch (IOException e2) {
                this.sinkCounter.incrementConnectionFailedCount();
                throw new EventDeliveryException("Failed to open file " + this.pathController.getCurrentFile() + " while delivering event", e2);
            }
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Sink.Status status = Sink.Status.READY;
        try {
            try {
                transaction.begin();
                int i = 0;
                int i2 = 0;
                while (true) {
                    if (i2 < this.batchSize) {
                        Event take = channel.take();
                        if (take == null) {
                            status = Sink.Status.BACKOFF;
                            break;
                        }
                        this.sinkCounter.incrementEventDrainAttemptCount();
                        i++;
                        this.serializer.write(take);
                        i2++;
                    } else {
                        break;
                    }
                }
                this.serializer.flush();
                this.outputStream.flush();
                transaction.commit();
                this.sinkCounter.addToEventDrainSuccessCount(i);
                transaction.close();
                return status;
            } catch (Exception e3) {
                transaction.rollback();
                throw new EventDeliveryException("Failed to process transaction", e3);
            }
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("RollingFile sink {} stopping...", getName());
        this.sinkCounter.stop();
        super.stop();
        if (this.outputStream != null) {
            logger.debug("Closing file {}", this.pathController.getCurrentFile());
            try {
                this.serializer.flush();
                this.serializer.beforeClose();
                this.outputStream.close();
                this.sinkCounter.incrementConnectionClosedCount();
            } catch (IOException e) {
                this.sinkCounter.incrementConnectionFailedCount();
                logger.error("Unable to close output stream. Exception follows.", e);
            } finally {
                this.outputStream = null;
                this.serializer = null;
            }
        }
        if (this.rollInterval > 0) {
            this.rollService.shutdown();
            while (!this.rollService.isTerminated()) {
                try {
                    this.rollService.awaitTermination(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e2) {
                    logger.debug("Interrupted while waiting for roll service to stop. Please report this.", e2);
                }
            }
        }
        logger.info("RollingFile sink {} stopped. Event metrics: {}", getName(), this.sinkCounter);
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File file) {
        this.directory = file;
    }

    public long getRollInterval() {
        return this.rollInterval;
    }

    public void setRollInterval(long j) {
        this.rollInterval = j;
    }
}
