package org.apache.hadoop.yarn.server.timelineservice.storage;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.class */
public class FileSystemTimelineWriterImpl extends AbstractService implements TimelineWriter {
    public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = "yarn.timeline-service.fs-writer.root-dir";
    public static final String TIMELINE_FS_WRITER_NUM_RETRIES = "yarn.timeline-service.fs-writer.num-retries";
    public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
    public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = "yarn.timeline-service.fs-writer.retry-interval-ms";
    public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000;
    public static final String ENTITIES_DIR = "entities";
    public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
    private FileSystem fs;
    private Path rootPath;
    private int fsNumRetries;
    private long fsRetryInterval;
    private Path entitiesPath;
    private Configuration config;
    private static final String STORAGE_DIR_ROOT = "timeline_service_data";
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemTimelineWriter.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl$FSAction.class */
    public abstract class FSAction<T> {
        private FSAction() {
        }

        abstract T run() throws IOException;

        T runWithRetries() throws IOException, InterruptedException {
            int i = 0;
            while (true) {
                try {
                    return run();
                } catch (IOException e) {
                    FileSystemTimelineWriterImpl.LOG.info("Exception while executing a FS operation.", e);
                    i++;
                    if (i > FileSystemTimelineWriterImpl.this.fsNumRetries) {
                        FileSystemTimelineWriterImpl.LOG.info("Maxed out FS retries. Giving up!");
                        throw e;
                    }
                    FileSystemTimelineWriterImpl.LOG.info("Will retry operation on FS. Retry no. " + i + " after sleeping for " + FileSystemTimelineWriterImpl.this.fsRetryInterval + " seconds");
                    Thread.sleep(FileSystemTimelineWriterImpl.this.fsRetryInterval);
                }
            }
        }
    }

    FileSystemTimelineWriterImpl() {
        super(FileSystemTimelineWriterImpl.class.getName());
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter
    public TimelineWriteResponse write(TimelineCollectorContext timelineCollectorContext, TimelineEntities timelineEntities, UserGroupInformation userGroupInformation) throws IOException {
        TimelineWriteResponse timelineWriteResponse = new TimelineWriteResponse();
        String clusterId = timelineCollectorContext.getClusterId();
        String userId = timelineCollectorContext.getUserId();
        String flowName = timelineCollectorContext.getFlowName();
        String flowVersion = timelineCollectorContext.getFlowVersion();
        long longValue = timelineCollectorContext.getFlowRunId().longValue();
        String appId = timelineCollectorContext.getAppId();
        Iterator it = timelineEntities.getEntities().iterator();
        while (it.hasNext()) {
            writeInternal(clusterId, userId, flowName, flowVersion, longValue, appId, (TimelineEntity) it.next(), timelineWriteResponse);
        }
        return timelineWriteResponse;
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter
    public TimelineWriteResponse write(TimelineCollectorContext timelineCollectorContext, TimelineDomain timelineDomain) throws IOException {
        return null;
    }

    private synchronized void writeInternal(String str, String str2, String str3, String str4, long j, String str5, TimelineEntity timelineEntity, TimelineWriteResponse timelineWriteResponse) throws IOException {
        Path path = new Path(this.entitiesPath, str + File.separator + str2 + File.separator + escape(str3) + File.separator + escape(str4) + File.separator + j + File.separator + str5 + File.separator + timelineEntity.getType());
        try {
            mkdirs(path);
            Path path2 = new Path(path, timelineEntity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
            createFileWithRetries(path2);
            writeFileWithRetries(path2, (TimelineUtils.dumpTimelineRecordtoJSON(timelineEntity) + "\n").getBytes("UTF-8"));
        } catch (Exception e) {
            LOG.warn("Interrupted operation:" + e.getMessage());
            timelineWriteResponse.addError(createTimelineWriteError(timelineEntity));
        }
    }

    private TimelineWriteResponse.TimelineWriteError createTimelineWriteError(TimelineEntity timelineEntity) {
        TimelineWriteResponse.TimelineWriteError timelineWriteError = new TimelineWriteResponse.TimelineWriteError();
        timelineWriteError.setEntityId(timelineEntity.getId());
        timelineWriteError.setEntityType(timelineEntity.getType());
        return timelineWriteError;
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter
    public TimelineWriteResponse aggregate(TimelineEntity timelineEntity, TimelineAggregationTrack timelineAggregationTrack) throws IOException {
        return null;
    }

    @VisibleForTesting
    String getOutputRoot() {
        return this.rootPath.toString();
    }

    public void serviceInit(Configuration configuration) throws Exception {
        this.rootPath = new Path(configuration.get("yarn.timeline-service.fs-writer.root-dir", configuration.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT));
        this.entitiesPath = new Path(this.rootPath, ENTITIES_DIR);
        this.fsNumRetries = configuration.getInt(TIMELINE_FS_WRITER_NUM_RETRIES, 0);
        this.fsRetryInterval = configuration.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS, 1000L);
        this.config = configuration;
        this.fs = this.rootPath.getFileSystem(this.config);
    }

    public void serviceStart() throws Exception {
        mkdirsWithRetries(this.rootPath);
        mkdirsWithRetries(this.entitiesPath);
    }

    @Override // org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter
    public void flush() throws IOException {
    }

    private void mkdirs(Path... pathArr) throws IOException, InterruptedException {
        for (Path path : pathArr) {
            if (!existsWithRetries(path)) {
                mkdirsWithRetries(path);
            }
        }
    }

    private void mkdirsWithRetries(final Path path) throws IOException, InterruptedException {
        new FSAction<Void>() { // from class: org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.FSAction
            public Void run() throws IOException {
                FileSystemTimelineWriterImpl.this.fs.mkdirs(path);
                return null;
            }
        }.runWithRetries();
    }

    private void writeFileWithRetries(final Path path, final byte[] bArr) throws Exception {
        new FSAction<Void>() { // from class: org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.FSAction
            public Void run() throws IOException {
                FileSystemTimelineWriterImpl.this.writeFile(path, bArr);
                return null;
            }
        }.runWithRetries();
    }

    private boolean createFileWithRetries(final Path path) throws IOException, InterruptedException {
        return new FSAction<Boolean>() { // from class: org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.FSAction
            public Boolean run() throws IOException {
                return Boolean.valueOf(FileSystemTimelineWriterImpl.this.createFile(path));
            }
        }.runWithRetries().booleanValue();
    }

    private boolean existsWithRetries(final Path path) throws IOException, InterruptedException {
        return new FSAction<Boolean>() { // from class: org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.FSAction
            public Boolean run() throws IOException {
                return Boolean.valueOf(FileSystemTimelineWriterImpl.this.fs.exists(path));
            }
        }.runWithRetries().booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean createFile(Path path) throws IOException {
        return this.fs.createNewFile(path);
    }

    protected void writeFile(Path path, byte[] bArr) throws IOException {
        Path path2 = new Path(path.getParent(), path.getName() + ".tmp");
        try {
            FSDataOutputStream create = this.fs.create(path2, true);
            FSDataInputStream open = this.fs.open(path);
            IOUtils.copyBytes(open, create, this.config, false);
            open.close();
            this.fs.delete(path, false);
            create.write(bArr);
            create.close();
            this.fs.rename(path2, path);
        } catch (IOException e) {
            LOG.error("Got an exception while writing file", e);
        }
    }

    private static String escape(String str) {
        return str.replace(File.separatorChar, '_');
    }
}
