package org.apache.tez.dag.history.logging.proto;

import java.io.IOException;
import java.time.LocalDate;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos;
import org.apache.tez.dag.records.TezDAGID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.class */
public class ProtoHistoryLoggingService extends HistoryLoggingService {
    private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class);
    static final String SPLIT_DAG_EVENTS_FILE_SUFFIX = "_1";
    private final HistoryEventProtoConverter converter;
    private boolean loggingDisabled;
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
    private Thread eventHandlingThread;
    private final AtomicBoolean stopped;
    private TezProtoLoggers loggers;
    private ProtoMessageWriter<HistoryLoggerProtos.HistoryEventProto> appEventsWriter;
    private ProtoMessageWriter<HistoryLoggerProtos.HistoryEventProto> dagEventsWriter;
    private ProtoMessageWriter<HistoryLoggerProtos.ManifestEntryProto> manifestEventsWriter;
    private LocalDate manifestDate;
    private TezDAGID currentDagId;
    private long dagSubmittedEventOffset;
    private String appEventsFile;
    private long appLaunchedEventOffset;
    private boolean splitDagStartEvents;

    public ProtoHistoryLoggingService() {
        super(ProtoHistoryLoggingService.class.getName());
        this.converter = new HistoryEventProtoConverter();
        this.loggingDisabled = false;
        this.stopped = new AtomicBoolean(false);
        this.dagSubmittedEventOffset = -1L;
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Initing ProtoHistoryLoggingService");
        setConfig(configuration);
        this.loggingDisabled = !configuration.getBoolean("tez.am.history.logging.enabled", true);
        this.splitDagStartEvents = configuration.getBoolean("tez.history.logging.split-dag-start", false);
        int i = configuration.getInt("tez.history.logging.queue.size", 100000);
        this.eventQueue = new LinkedBlockingQueue<>(i);
        LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {} splitDagStartEvents: {} queueSize: {}", new Object[]{Boolean.valueOf(this.loggingDisabled), Boolean.valueOf(this.splitDagStartEvents), Integer.valueOf(i)});
    }

    protected void serviceStart() throws Exception {
        LOG.info("Starting ProtoHistoryLoggingService");
        if (!this.loggingDisabled) {
            this.loggers = new TezProtoLoggers();
            if (!this.loggers.setup(getConfig(), this.appContext.getClock())) {
                LOG.warn("Log file location for ProtoHistoryLoggingService not specified, logging disabled");
                this.loggingDisabled = true;
                return;
            } else {
                this.appEventsWriter = this.loggers.getAppEventsLogger().getWriter(this.appContext.getApplicationAttemptId().toString());
                this.eventHandlingThread = new Thread(this::loop, "HistoryEventHandlingThread");
                this.eventHandlingThread.start();
            }
        }
        LOG.info("Started ProtoHistoryLoggingService");
    }

    protected void serviceStop() throws Exception {
        LOG.info("Stopping ProtoHistoryLoggingService, eventQueueBacklog=" + this.eventQueue.size());
        this.stopped.set(true);
        this.eventHandlingThread.join();
        IOUtils.closeQuietly(this.appEventsWriter);
        IOUtils.closeQuietly(this.dagEventsWriter);
        IOUtils.closeQuietly(this.manifestEventsWriter);
        LOG.info("Stopped ProtoHistoryLoggingService");
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        if (this.loggingDisabled || this.stopped.get()) {
            return;
        }
        try {
            this.eventQueue.add(dAGHistoryEvent);
        } catch (IllegalStateException e) {
            LOG.error("Queue capacity filled up, ignoring event: " + dAGHistoryEvent.getHistoryEvent().getEventType());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Queue capacity filled up, ignoring event: {}", dAGHistoryEvent.getHistoryEvent());
            }
        }
    }

    private void loop() {
        while (true) {
            if (this.stopped.get() && this.eventQueue.isEmpty()) {
                return;
            }
            DAGHistoryEvent dAGHistoryEvent = null;
            try {
                dAGHistoryEvent = this.eventQueue.poll(100L, TimeUnit.MILLISECONDS);
                if (dAGHistoryEvent != null) {
                    handleEvent(dAGHistoryEvent);
                }
            } catch (IOException e) {
                LOG.error("Got exception while handling event {} for dag {}.", new Object[]{dAGHistoryEvent.getHistoryEvent().getEventType(), dAGHistoryEvent.getDagID(), e});
            } catch (InterruptedException e2) {
                LOG.info("EventQueue poll interrupted, ignoring it.", e2);
            }
        }
    }

    private void handleEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
        if (this.loggingDisabled) {
            return;
        }
        HistoryEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
        if (dAGHistoryEvent.getDagID() == null) {
            if (historyEvent.getEventType() == HistoryEventType.APP_LAUNCHED) {
                this.appEventsFile = this.appEventsWriter.getPath().toString();
                this.appLaunchedEventOffset = this.appEventsWriter.getOffset();
            }
            this.appEventsWriter.writeProto(this.converter.convert(historyEvent));
            return;
        }
        HistoryEventType eventType = historyEvent.getEventType();
        TezDAGID dagID = dAGHistoryEvent.getDagID();
        if (eventType == HistoryEventType.DAG_FINISHED) {
            finishCurrentDag((DAGFinishedEvent) historyEvent);
            return;
        }
        if (eventType == HistoryEventType.DAG_SUBMITTED) {
            finishCurrentDag(null);
            this.currentDagId = dagID;
            this.dagEventsWriter = this.loggers.getDagEventsLogger().getWriter(dagID.toString() + "_" + this.appContext.getApplicationAttemptId().getAttemptId());
            this.dagSubmittedEventOffset = this.dagEventsWriter.getOffset();
            this.dagEventsWriter.writeProto(this.converter.convert(historyEvent));
            return;
        }
        if (this.dagEventsWriter != null) {
            this.dagEventsWriter.writeProto(this.converter.convert(historyEvent));
            if (this.splitDagStartEvents && eventType == HistoryEventType.DAG_STARTED) {
                finishCurrentDag(null);
                this.dagEventsWriter = this.loggers.getDagEventsLogger().getWriter(dagID.toString() + "_" + this.appContext.getApplicationAttemptId().getAttemptId() + SPLIT_DAG_EVENTS_FILE_SUFFIX);
            }
        }
    }

    private void finishCurrentDag(DAGFinishedEvent dAGFinishedEvent) throws IOException {
        if (this.dagEventsWriter == null) {
            return;
        }
        long j = -1;
        if (dAGFinishedEvent != null) {
            try {
                j = this.dagEventsWriter.getOffset();
                this.dagEventsWriter.writeProto(this.converter.convert(dAGFinishedEvent));
            } catch (Throwable th) {
                IOUtils.closeQuietly(this.dagEventsWriter);
                this.dagEventsWriter = null;
                this.dagSubmittedEventOffset = -1L;
                throw th;
            }
        }
        DatePartitionedLogger<HistoryLoggerProtos.ManifestEntryProto> manifestEventsLogger = this.loggers.getManifestEventsLogger();
        if (this.manifestDate == null || !this.manifestDate.equals(manifestEventsLogger.getNow().toLocalDate())) {
            IOUtils.closeQuietly(this.manifestEventsWriter);
            this.manifestEventsWriter = manifestEventsLogger.getWriter(this.appContext.getApplicationAttemptId().toString());
            this.manifestDate = manifestEventsLogger.getDateFromDir(this.manifestEventsWriter.getPath().getParent().getName());
        }
        HistoryLoggerProtos.ManifestEntryProto.Builder writeTime = HistoryLoggerProtos.ManifestEntryProto.newBuilder().setDagId(this.currentDagId.toString()).setAppId(this.currentDagId.getApplicationId().toString()).setDagSubmittedEventOffset(this.dagSubmittedEventOffset).setDagFinishedEventOffset(j).setDagFilePath(this.dagEventsWriter.getPath().toString()).setAppFilePath(this.appEventsFile).setAppLaunchedEventOffset(this.appLaunchedEventOffset).setWriteTime(System.currentTimeMillis());
        if (dAGFinishedEvent != null) {
            writeTime.setDagId(dAGFinishedEvent.getDagID().toString());
        }
        this.manifestEventsWriter.writeProto(writeTime.build());
        this.manifestEventsWriter.hflush();
        this.appEventsWriter.hflush();
        IOUtils.closeQuietly(this.dagEventsWriter);
        this.dagEventsWriter = null;
        this.dagSubmittedEventOffset = -1L;
    }
}
